001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.UncheckedIOException; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Deque; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.Executor; 033import java.util.concurrent.Executors; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 043import org.apache.hadoop.hbase.log.HBaseMarkers; 044import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 048import org.apache.hadoop.hbase.procedure2.util.StringUtils; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.IdLock; 052import org.apache.hadoop.hbase.util.NonceKey; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 063 064/** 065 * Thread Pool that executes the submitted procedures. 066 * The executor has a ProcedureStore associated. 067 * Each operation is logged and on restart the pending procedures are resumed. 068 * 069 * Unless the Procedure code throws an error (e.g. invalid user input) 070 * the procedure will complete (at some point in time), On restart the pending 071 * procedures are resumed and the once failed will be rolledback. 072 * 073 * The user can add procedures to the executor via submitProcedure(proc) 074 * check for the finished state via isFinished(procId) 075 * and get the result via getResult(procId) 076 */ 077@InterfaceAudience.Private 078public class ProcedureExecutor<TEnvironment> { 079 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 080 081 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 082 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 083 084 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 085 "hbase.procedure.worker.keep.alive.time.msec"; 086 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 087 088 // Enable this flag if you want to upgrade to 2.2+, there are some incompatible changes on how we 089 // assign or unassign a region, so we need to make sure all these procedures have been finished 090 // before we start the master with new code. See HBASE-20881 and HBASE-21075 for more details. 091 public static final String UPGRADE_TO_2_2 = "hbase.procedure.upgrade-to-2-2"; 092 private static final boolean DEFAULT_UPGRADE_TO_2_2 = false; 093 094 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 095 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 096 097 public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; 098 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 099 100 /** 101 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to 102 * break PE having it fail at various junctures. When non-null, testing is set to an instance of 103 * the below internal {@link Testing} class with flags set for the particular test. 104 */ 105 Testing testing = null; 106 107 /** 108 * Class with parameters describing how to fail/die when in testing-context. 109 */ 110 public static class Testing { 111 protected boolean killIfSuspended = false; 112 113 /** 114 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 115 * persisting all the state it needs to recover after a crash. 116 */ 117 protected boolean killBeforeStoreUpdate = false; 118 protected boolean toggleKillBeforeStoreUpdate = false; 119 120 /** 121 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 122 * is about a case where memory-state was being set after store to WAL where a crash could 123 * cause us to get stuck. This flag allows killing at what was a vulnerable time. 124 */ 125 protected boolean killAfterStoreUpdate = false; 126 protected boolean toggleKillAfterStoreUpdate = false; 127 128 protected boolean shouldKillBeforeStoreUpdate() { 129 final boolean kill = this.killBeforeStoreUpdate; 130 if (this.toggleKillBeforeStoreUpdate) { 131 this.killBeforeStoreUpdate = !kill; 132 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 133 } 134 return kill; 135 } 136 137 protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) { 138 return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate(); 139 } 140 141 protected boolean shouldKillAfterStoreUpdate() { 142 final boolean kill = this.killAfterStoreUpdate; 143 if (this.toggleKillAfterStoreUpdate) { 144 this.killAfterStoreUpdate = !kill; 145 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 146 } 147 return kill; 148 } 149 150 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 151 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 152 } 153 } 154 155 public interface ProcedureExecutorListener { 156 void procedureLoaded(long procId); 157 void procedureAdded(long procId); 158 void procedureFinished(long procId); 159 } 160 161 /** 162 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. 163 * Once a Root-Procedure completes (success or failure), the result will be added to this map. 164 * The user of ProcedureExecutor should call getResult(procId) to get the result. 165 */ 166 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 167 new ConcurrentHashMap<>(); 168 169 /** 170 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 171 * The RootProcedureState contains the execution stack of the Root-Procedure, 172 * It is added to the map by submitProcedure() and removed on procedure completion. 173 */ 174 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 175 new ConcurrentHashMap<>(); 176 177 /** 178 * Helper map to lookup the live procedures by ID. 179 * This map contains every procedure. root-procedures and subprocedures. 180 */ 181 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 182 new ConcurrentHashMap<>(); 183 184 /** 185 * Helper map to lookup whether the procedure already issued from the same client. This map 186 * contains every root procedure. 187 */ 188 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 189 190 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 191 new CopyOnWriteArrayList<>(); 192 193 private Configuration conf; 194 195 /** 196 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 197 * resource handling rather than observing in a #join is unexpected). 198 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 199 * (Should be ok). 200 */ 201 private ThreadGroup threadGroup; 202 203 /** 204 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 205 * resource handling rather than observing in a #join is unexpected). 206 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 207 * (Should be ok). 208 */ 209 private CopyOnWriteArrayList<WorkerThread> workerThreads; 210 211 /** 212 * Worker thread only for urgent tasks. 213 */ 214 private CopyOnWriteArrayList<WorkerThread> urgentWorkerThreads; 215 216 /** 217 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 218 * resource handling rather than observing in a #join is unexpected). 219 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 220 * (Should be ok). 221 */ 222 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 223 224 /** 225 * WorkerMonitor check for stuck workers and new worker thread when necessary, for example if 226 * there is no worker to assign meta, it will new worker thread for it, so it is very important. 227 * TimeoutExecutor execute many tasks like DeadServerMetricRegionChore RegionInTransitionChore 228 * and so on, some tasks may execute for a long time so will block other tasks like 229 * WorkerMonitor, so use a dedicated thread for executing WorkerMonitor. 230 */ 231 private TimeoutExecutorThread<TEnvironment> workerMonitorExecutor; 232 233 private int corePoolSize; 234 private int maxPoolSize; 235 private int urgentPoolSize; 236 237 private volatile long keepAliveTime; 238 239 /** 240 * Scheduler/Queue that contains runnable procedures. 241 */ 242 private final ProcedureScheduler scheduler; 243 244 private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( 245 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); 246 247 private final AtomicLong lastProcId = new AtomicLong(-1); 248 private final AtomicLong workerId = new AtomicLong(0); 249 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 250 private final AtomicBoolean running = new AtomicBoolean(false); 251 private final TEnvironment environment; 252 private final ProcedureStore store; 253 254 private final boolean checkOwnerSet; 255 256 // To prevent concurrent execution of the same procedure. 257 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 258 // procedure is woken up before we finish the suspend which causes the same procedures to be 259 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 260 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 261 // execution of the same procedure. 262 private final IdLock procExecutionLock = new IdLock(); 263 264 private final boolean upgradeTo2_2; 265 266 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 267 final ProcedureStore store) { 268 this(conf, environment, store, new SimpleProcedureScheduler()); 269 } 270 271 private boolean isRootFinished(Procedure<?> proc) { 272 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 273 return rootProc == null || rootProc.isFinished(); 274 } 275 276 private void forceUpdateProcedure(long procId) throws IOException { 277 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 278 try { 279 Procedure<TEnvironment> proc = procedures.get(procId); 280 if (proc != null) { 281 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 282 LOG.debug("Procedure {} has already been finished and parent is succeeded," + 283 " skip force updating", proc); 284 return; 285 } 286 } else { 287 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 288 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 289 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 290 return; 291 } 292 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 293 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 294 if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) { 295 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 296 procId); 297 return; 298 } 299 proc = retainer.getProcedure(); 300 } 301 LOG.debug("Force update procedure {}", proc); 302 store.update(proc); 303 } finally { 304 procExecutionLock.releaseLockEntry(lockEntry); 305 } 306 } 307 308 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 309 final ProcedureStore store, final ProcedureScheduler scheduler) { 310 this.environment = environment; 311 this.scheduler = scheduler; 312 this.store = store; 313 this.conf = conf; 314 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 315 this.upgradeTo2_2 = conf.getBoolean(UPGRADE_TO_2_2, DEFAULT_UPGRADE_TO_2_2); 316 refreshConfiguration(conf); 317 store.registerListener(new ProcedureStoreListener() { 318 319 @Override 320 public void forceUpdate(long[] procIds) { 321 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 322 try { 323 forceUpdateProcedure(procId); 324 } catch (IOException e) { 325 LOG.warn("Failed to force update procedure with pid={}", procId); 326 } 327 })); 328 } 329 }); 330 } 331 332 private void load(final boolean abortOnCorruption) throws IOException { 333 Preconditions.checkArgument(completed.isEmpty(), "completed not empty"); 334 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty"); 335 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty"); 336 Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty"); 337 338 store.load(new ProcedureStore.ProcedureLoader() { 339 @Override 340 public void setMaxProcId(long maxProcId) { 341 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 342 lastProcId.set(maxProcId); 343 } 344 345 @Override 346 public void load(ProcedureIterator procIter) throws IOException { 347 loadProcedures(procIter, abortOnCorruption); 348 } 349 350 @Override 351 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 352 int corruptedCount = 0; 353 while (procIter.hasNext()) { 354 Procedure<?> proc = procIter.next(); 355 LOG.error("Corrupt " + proc); 356 corruptedCount++; 357 } 358 if (abortOnCorruption && corruptedCount > 0) { 359 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 360 } 361 } 362 }); 363 } 364 365 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 366 proc.restoreLock(getEnvironment()); 367 restored.add(proc.getProcId()); 368 } 369 370 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 371 while (!stack.isEmpty()) { 372 restoreLock(stack.pop(), restored); 373 } 374 } 375 376 // Restore the locks for all the procedures. 377 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 378 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 379 // calling the acquireLock method for the parent procedure. 380 // The algorithm is straight-forward: 381 // 1. Use a set to record the procedures which locks have already been restored. 382 // 2. Use a stack to store the hierarchy of the procedures 383 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 384 // unless 385 // a. We have no parent, i.e, we are the root procedure 386 // b. The lock has already been restored(by checking the set introduced in #1) 387 // then we start to pop the stack and call acquireLock for each procedure. 388 // Notice that this should be done for all procedures, not only the ones in runnableList. 389 private void restoreLocks() { 390 Set<Long> restored = new HashSet<>(); 391 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 392 procedures.values().forEach(proc -> { 393 for (;;) { 394 if (restored.contains(proc.getProcId())) { 395 restoreLocks(stack, restored); 396 return; 397 } 398 if (!proc.hasParent()) { 399 restoreLock(proc, restored); 400 restoreLocks(stack, restored); 401 return; 402 } 403 stack.push(proc); 404 proc = procedures.get(proc.getParentProcId()); 405 } 406 }); 407 } 408 409 private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) 410 throws IOException { 411 // 1. Build the rollback stack 412 int runnableCount = 0; 413 int failedCount = 0; 414 int waitingCount = 0; 415 int waitingTimeoutCount = 0; 416 while (procIter.hasNext()) { 417 boolean finished = procIter.isNextFinished(); 418 Procedure<TEnvironment> proc = procIter.next(); 419 NonceKey nonceKey = proc.getNonceKey(); 420 long procId = proc.getProcId(); 421 422 if (finished) { 423 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 424 LOG.debug("Completed {}", proc); 425 } else { 426 if (!proc.hasParent()) { 427 assert !proc.isFinished() : "unexpected finished procedure"; 428 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 429 } 430 431 // add the procedure to the map 432 proc.beforeReplay(getEnvironment()); 433 procedures.put(proc.getProcId(), proc); 434 switch (proc.getState()) { 435 case RUNNABLE: 436 runnableCount++; 437 break; 438 case FAILED: 439 failedCount++; 440 break; 441 case WAITING: 442 waitingCount++; 443 break; 444 case WAITING_TIMEOUT: 445 waitingTimeoutCount++; 446 break; 447 default: 448 break; 449 } 450 } 451 452 // add the nonce to the map 453 if (nonceKey != null) { 454 nonceKeysToProcIdsMap.put(nonceKey, procId); 455 } 456 } 457 458 // 2. Initialize the stacks 459 // In the old implementation, for procedures in FAILED state, we will push it into the 460 // ProcedureScheduler directly to execute the rollback. But this does not work after we 461 // introduce the restore lock stage. 462 // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and 463 // then when a procedure which has lock access, for example, a sub procedure of the procedure 464 // which has the xlock, is pushed into the scheduler, we will add the queue back to let the 465 // workers poll from it. The assumption here is that, the procedure which has the xlock should 466 // have been polled out already, so when loading we can not add the procedure to scheduler first 467 // and then call acquireLock, since the procedure is still in the queue, and since we will 468 // remove the queue from runQueue, then no one can poll it out, then there is a dead lock 469 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 470 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 471 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 472 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 473 procIter.reset(); 474 while (procIter.hasNext()) { 475 if (procIter.isNextFinished()) { 476 procIter.skipNext(); 477 continue; 478 } 479 480 Procedure<TEnvironment> proc = procIter.next(); 481 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 482 483 LOG.debug("Loading {}", proc); 484 485 Long rootProcId = getRootProcedureId(proc); 486 // The orphan procedures will be passed to handleCorrupted, so add an assert here 487 assert rootProcId != null; 488 489 if (proc.hasParent()) { 490 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 491 if (parent != null && !proc.isFinished()) { 492 parent.incChildrenLatch(); 493 } 494 } 495 496 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 497 procStack.loadStack(proc); 498 499 proc.setRootProcId(rootProcId); 500 switch (proc.getState()) { 501 case RUNNABLE: 502 runnableList.add(proc); 503 break; 504 case WAITING: 505 waitingList.add(proc); 506 break; 507 case WAITING_TIMEOUT: 508 waitingTimeoutList.add(proc); 509 break; 510 case FAILED: 511 failedList.add(proc); 512 break; 513 case ROLLEDBACK: 514 case INITIALIZING: 515 String msg = "Unexpected " + proc.getState() + " state for " + proc; 516 LOG.error(msg); 517 throw new UnsupportedOperationException(msg); 518 default: 519 break; 520 } 521 } 522 523 // 3. Check the waiting procedures to see if some of them can be added to runnable. 524 waitingList.forEach(proc -> { 525 if (!proc.hasChildren()) { 526 // Normally, WAITING procedures should be waken by its children. 527 // But, there is a case that, all the children are successful and before 528 // they can wake up their parent procedure, the master was killed. 529 // So, during recovering the procedures from ProcedureWal, its children 530 // are not loaded because of their SUCCESS state. 531 // So we need to continue to run this WAITING procedure. But before 532 // executing, we need to set its state to RUNNABLE, otherwise, a exception 533 // will throw: 534 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 535 // "NOT RUNNABLE! " + procedure.toString()); 536 proc.setState(ProcedureState.RUNNABLE); 537 runnableList.add(proc); 538 } else { 539 proc.afterReplay(getEnvironment()); 540 } 541 }); 542 // 4. restore locks 543 restoreLocks(); 544 545 // 5. Push the procedures to the timeout executor 546 waitingTimeoutList.forEach(proc -> { 547 proc.afterReplay(getEnvironment()); 548 timeoutExecutor.add(proc); 549 }); 550 551 // 6. Push the procedure to the scheduler 552 failedList.forEach(scheduler::addBack); 553 runnableList.forEach(p -> { 554 p.afterReplay(getEnvironment()); 555 if (!p.hasParent()) { 556 sendProcedureLoadedNotification(p.getProcId()); 557 } 558 scheduler.addBack(p); 559 }); 560 // After all procedures put into the queue, signal the worker threads. 561 // Otherwise, there is a race condition. See HBASE-21364. 562 scheduler.signalAll(); 563 } 564 565 /** 566 * Initialize the procedure executor, but do not start workers. We will start them later. 567 * <p/> 568 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 569 * ensure a single executor, and start the procedure replay to resume and recover the previous 570 * pending and in-progress procedures. 571 * @param numThreads number of threads available for procedure execution. 572 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 573 * is found on replay. otherwise false. 574 */ 575 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 576 init(numThreads, 0, abortOnCorruption); 577 } 578 579 /** 580 * Initialize the procedure executor, but do not start workers. We will start them later. 581 * <p/> 582 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 583 * ensure a single executor, and start the procedure replay to resume and recover the previous 584 * pending and in-progress procedures. 585 * @param numThreads number of threads available for procedure execution. 586 * @param urgentNumThreads number of threads available for urgent procedure execution. 587 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 588 * is found on replay. otherwise false. 589 */ 590 public void init(int numThreads, int urgentNumThreads, 591 boolean abortOnCorruption) throws IOException { 592 // We have numThreads executor + one timer thread used for timing out 593 // procedures and triggering periodic procedures. 594 this.corePoolSize = numThreads; 595 this.maxPoolSize = 10 * numThreads; 596 this.urgentPoolSize = urgentNumThreads; 597 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker " 598 + "count={}, start {} urgent thread(s)", 599 corePoolSize, maxPoolSize, urgentPoolSize); 600 601 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 602 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup, "ProcExecTimeout"); 603 this.workerMonitorExecutor = new TimeoutExecutorThread<>(this, threadGroup, "WorkerMonitor"); 604 605 // Create the workers 606 workerId.set(0); 607 workerThreads = new CopyOnWriteArrayList<>(); 608 urgentWorkerThreads = new CopyOnWriteArrayList<>(); 609 for (int i = 0; i < corePoolSize; ++i) { 610 workerThreads.add(new WorkerThread(threadGroup)); 611 } 612 for (int i = 0; i < urgentNumThreads; ++i) { 613 urgentWorkerThreads 614 .add(new WorkerThread(threadGroup, "UrgentPEWorker-", true)); 615 } 616 617 long st, et; 618 619 // Acquire the store lease. 620 st = System.nanoTime(); 621 store.recoverLease(); 622 et = System.nanoTime(); 623 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 624 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 625 626 // start the procedure scheduler 627 scheduler.start(); 628 629 // TODO: Split in two steps. 630 // TODO: Handle corrupted procedures (currently just a warn) 631 // The first one will make sure that we have the latest id, 632 // so we can start the threads and accept new procedures. 633 // The second step will do the actual load of old procedures. 634 st = System.nanoTime(); 635 load(abortOnCorruption); 636 et = System.nanoTime(); 637 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 638 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 639 } 640 641 /** 642 * Start the workers. 643 */ 644 public void startWorkers() throws IOException { 645 if (!running.compareAndSet(false, true)) { 646 LOG.warn("Already running"); 647 return; 648 } 649 // Start the executors. Here we must have the lastProcId set. 650 LOG.debug("Start workers {}, urgent workers {}", workerThreads.size(), 651 urgentWorkerThreads.size()); 652 timeoutExecutor.start(); 653 workerMonitorExecutor.start(); 654 for (WorkerThread worker: workerThreads) { 655 worker.start(); 656 } 657 658 for (WorkerThread worker: urgentWorkerThreads) { 659 worker.start(); 660 } 661 662 // Internal chores 663 workerMonitorExecutor.add(new WorkerMonitor()); 664 665 if (upgradeTo2_2) { 666 timeoutExecutor.add(new InlineChore() { 667 668 @Override 669 public void run() { 670 if (procedures.isEmpty()) { 671 LOG.info("UPGRADE OK: All existed procedures have been finished, quit..."); 672 System.exit(0); 673 } 674 } 675 676 @Override 677 public int getTimeoutInterval() { 678 // check every 5 seconds to see if we can quit 679 return 5000; 680 } 681 }); 682 } 683 684 // Add completed cleaner chore 685 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 686 nonceKeysToProcIdsMap)); 687 } 688 689 public void stop() { 690 if (!running.getAndSet(false)) { 691 return; 692 } 693 694 LOG.info("Stopping"); 695 scheduler.stop(); 696 timeoutExecutor.sendStopSignal(); 697 workerMonitorExecutor.sendStopSignal(); 698 } 699 700 @VisibleForTesting 701 public void join() { 702 assert !isRunning() : "expected not running"; 703 704 // stop the timeout executor 705 timeoutExecutor.awaitTermination(); 706 // stop the work monitor executor 707 workerMonitorExecutor.awaitTermination(); 708 709 // stop the worker threads 710 for (WorkerThread worker: workerThreads) { 711 worker.awaitTermination(); 712 } 713 714 // stop the worker threads 715 for (WorkerThread worker: urgentWorkerThreads) { 716 worker.awaitTermination(); 717 } 718 719 // Destroy the Thread Group for the executors 720 // TODO: Fix. #join is not place to destroy resources. 721 try { 722 threadGroup.destroy(); 723 } catch (IllegalThreadStateException e) { 724 LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", 725 this.threadGroup, e.getMessage()); 726 // This dumps list of threads on STDOUT. 727 this.threadGroup.list(); 728 } 729 730 // reset the in-memory state for testing 731 completed.clear(); 732 rollbackStack.clear(); 733 procedures.clear(); 734 nonceKeysToProcIdsMap.clear(); 735 scheduler.clear(); 736 lastProcId.set(-1); 737 } 738 739 public void refreshConfiguration(final Configuration conf) { 740 this.conf = conf; 741 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, 742 DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS); 743 } 744 745 // ========================================================================== 746 // Accessors 747 // ========================================================================== 748 public boolean isRunning() { 749 return running.get(); 750 } 751 752 /** 753 * @return the current number of worker threads. 754 */ 755 public int getWorkerThreadCount() { 756 return workerThreads.size() + urgentWorkerThreads.size(); 757 } 758 759 /** 760 * @return the core pool size settings. 761 */ 762 public int getCorePoolSize() { 763 return corePoolSize; 764 } 765 766 public int getUrgentPoolSize() { 767 return urgentPoolSize; 768 } 769 770 public int getActiveExecutorCount() { 771 return activeExecutorCount.get(); 772 } 773 774 public TEnvironment getEnvironment() { 775 return this.environment; 776 } 777 778 public ProcedureStore getStore() { 779 return this.store; 780 } 781 782 ProcedureScheduler getScheduler() { 783 return scheduler; 784 } 785 786 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 787 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 788 this.scheduler.signalAll(); 789 } 790 791 public long getKeepAliveTime(final TimeUnit timeUnit) { 792 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 793 } 794 795 // ========================================================================== 796 // Submit/Remove Chores 797 // ========================================================================== 798 799 /** 800 * Add a chore procedure to the executor 801 * @param chore the chore to add 802 */ 803 public void addChore(ProcedureInMemoryChore<TEnvironment> chore) { 804 chore.setState(ProcedureState.WAITING_TIMEOUT); 805 timeoutExecutor.add(chore); 806 } 807 808 /** 809 * Remove a chore procedure from the executor 810 * @param chore the chore to remove 811 * @return whether the chore is removed, or it will be removed later 812 */ 813 public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) { 814 chore.setState(ProcedureState.SUCCESS); 815 return timeoutExecutor.remove(chore); 816 } 817 818 // ========================================================================== 819 // Nonce Procedure helpers 820 // ========================================================================== 821 /** 822 * Create a NoneKey from the specified nonceGroup and nonce. 823 * @param nonceGroup 824 * @param nonce 825 * @return the generated NonceKey 826 */ 827 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 828 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 829 } 830 831 /** 832 * Register a nonce for a procedure that is going to be submitted. 833 * A procId will be reserved and on submitProcedure(), 834 * the procedure with the specified nonce will take the reserved ProcId. 835 * If someone already reserved the nonce, this method will return the procId reserved, 836 * otherwise an invalid procId will be returned. and the caller should procede 837 * and submit the procedure. 838 * 839 * @param nonceKey A unique identifier for this operation from the client or process. 840 * @return the procId associated with the nonce, if any otherwise an invalid procId. 841 */ 842 public long registerNonce(final NonceKey nonceKey) { 843 if (nonceKey == null) return -1; 844 845 // check if we have already a Reserved ID for the nonce 846 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 847 if (oldProcId == null) { 848 // reserve a new Procedure ID, this will be associated with the nonce 849 // and the procedure submitted with the specified nonce will use this ID. 850 final long newProcId = nextProcId(); 851 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 852 if (oldProcId == null) return -1; 853 } 854 855 // we found a registered nonce, but the procedure may not have been submitted yet. 856 // since the client expect the procedure to be submitted, spin here until it is. 857 final boolean traceEnabled = LOG.isTraceEnabled(); 858 while (isRunning() && 859 !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && 860 nonceKeysToProcIdsMap.containsKey(nonceKey)) { 861 if (traceEnabled) { 862 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 863 } 864 Threads.sleep(100); 865 } 866 return oldProcId.longValue(); 867 } 868 869 /** 870 * Remove the NonceKey if the procedure was not submitted to the executor. 871 * @param nonceKey A unique identifier for this operation from the client or process. 872 */ 873 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 874 if (nonceKey == null) return; 875 876 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 877 if (procId == null) return; 878 879 // if the procedure was not submitted, remove the nonce 880 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 881 nonceKeysToProcIdsMap.remove(nonceKey); 882 } 883 } 884 885 /** 886 * If the failure failed before submitting it, we may want to give back the 887 * same error to the requests with the same nonceKey. 888 * 889 * @param nonceKey A unique identifier for this operation from the client or process 890 * @param procName name of the procedure, used to inform the user 891 * @param procOwner name of the owner of the procedure, used to inform the user 892 * @param exception the failure to report to the user 893 */ 894 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 895 IOException exception) { 896 if (nonceKey == null) { 897 return; 898 } 899 900 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 901 if (procId == null || completed.containsKey(procId)) { 902 return; 903 } 904 905 Procedure<TEnvironment> proc = 906 new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); 907 908 completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc)); 909 } 910 911 // ========================================================================== 912 // Submit/Abort Procedure 913 // ========================================================================== 914 /** 915 * Add a new root-procedure to the executor. 916 * @param proc the new procedure to execute. 917 * @return the procedure id, that can be used to monitor the operation 918 */ 919 public long submitProcedure(Procedure<TEnvironment> proc) { 920 return submitProcedure(proc, null); 921 } 922 923 /** 924 * Bypass a procedure. If the procedure is set to bypass, all the logic in 925 * execute/rollback will be ignored and it will return success, whatever. 926 * It is used to recover buggy stuck procedures, releasing the lock resources 927 * and letting other procedures run. Bypassing one procedure (and its ancestors will 928 * be bypassed automatically) may leave the cluster in a middle state, e.g. region 929 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, 930 * the operators may have to do some clean up on hdfs or schedule some assign procedures 931 * to let region online. DO AT YOUR OWN RISK. 932 * <p> 933 * A procedure can be bypassed only if 934 * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT 935 * or it is a root procedure without any child. 936 * 2. No other worker thread is executing it 937 * 3. No child procedure has been submitted 938 * 939 * <p> 940 * If all the requirements are meet, the procedure and its ancestors will be 941 * bypassed and persisted to WAL. 942 * 943 * <p> 944 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. 945 * TODO: What about WAITING_TIMEOUT? 946 * @param pids the procedure id 947 * @param lockWait time to wait lock 948 * @param force if force set to true, we will bypass the procedure even if it is executing. 949 * This is for procedures which can't break out during executing(due to bug, mostly) 950 * In this case, bypassing the procedure is not enough, since it is already stuck 951 * there. We need to restart the master after bypassing, and letting the problematic 952 * procedure to execute wth bypass=true, so in that condition, the procedure can be 953 * successfully bypassed. 954 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 955 * @return true if bypass success 956 * @throws IOException IOException 957 */ 958 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 959 boolean recursive) 960 throws IOException { 961 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 962 for(long pid: pids) { 963 result.add(bypassProcedure(pid, lockWait, force, recursive)); 964 } 965 return result; 966 } 967 968 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 969 throws IOException { 970 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 971 final Procedure<TEnvironment> procedure = getProcedure(pid); 972 if (procedure == null) { 973 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 974 return false; 975 } 976 977 LOG.info("Begin bypass {} with lockWait={}, override={}, recursive={}", 978 procedure, lockWait, override, recursive); 979 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 980 if (lockEntry == null && !override) { 981 LOG.info("Waited {} ms, but {} is still running, skipping bypass with override={}", 982 lockWait, procedure, override); 983 return false; 984 } else if (lockEntry == null) { 985 LOG.info("Waited {} ms, but {} is still running, begin bypass with override={}", 986 lockWait, procedure, override); 987 } 988 try { 989 // check whether the procedure is already finished 990 if (procedure.isFinished()) { 991 LOG.info("{} is already finished, skipping bypass", procedure); 992 return false; 993 } 994 995 if (procedure.hasChildren()) { 996 if (recursive) { 997 // EXPENSIVE. Checks each live procedure of which there could be many!!! 998 // Is there another way to get children of a procedure? 999 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 1000 this.procedures.forEachValue(1 /*Single-threaded*/, 1001 // Transformer 1002 v -> { 1003 return v.getParentProcId() == procedure.getProcId()? v: null; 1004 }, 1005 // Consumer 1006 v -> { 1007 boolean result = false; 1008 IOException ioe = null; 1009 try { 1010 result = bypassProcedure(v.getProcId(), lockWait, override, recursive); 1011 } catch (IOException e) { 1012 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 1013 } 1014 }); 1015 } else { 1016 LOG.info("{} has children, skipping bypass", procedure); 1017 return false; 1018 } 1019 } 1020 1021 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 1022 if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 1023 && procedure.getState() != ProcedureState.WAITING 1024 && procedure.getState() != ProcedureState.WAITING_TIMEOUT) { 1025 LOG.info("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 1026 + "(with no parent), {}", 1027 procedure); 1028 // Question: how is the bypass done here? 1029 return false; 1030 } 1031 1032 // Now, the procedure is not finished, and no one can execute it since we take the lock now 1033 // And we can be sure that its ancestor is not running too, since their child has not 1034 // finished yet 1035 Procedure<TEnvironment> current = procedure; 1036 while (current != null) { 1037 LOG.info("Bypassing {}", current); 1038 current.bypass(getEnvironment()); 1039 store.update(procedure); 1040 long parentID = current.getParentProcId(); 1041 current = getProcedure(parentID); 1042 } 1043 1044 //wake up waiting procedure, already checked there is no child 1045 if (procedure.getState() == ProcedureState.WAITING) { 1046 procedure.setState(ProcedureState.RUNNABLE); 1047 store.update(procedure); 1048 } 1049 1050 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 1051 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 1052 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1053 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 1054 if (timeoutExecutor.remove(procedure)) { 1055 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 1056 timeoutExecutor.executeTimedoutProcedure(procedure); 1057 } 1058 } else if (lockEntry != null) { 1059 scheduler.addFront(procedure); 1060 LOG.info("Bypassing {} and its ancestors successfully, adding to queue", procedure); 1061 } else { 1062 // If we don't have the lock, we can't re-submit the queue, 1063 // since it is already executing. To get rid of the stuck situation, we 1064 // need to restart the master. With the procedure set to bypass, the procedureExecutor 1065 // will bypass it and won't get stuck again. 1066 LOG.info("Bypassing {} and its ancestors successfully, but since it is already running, " 1067 + "skipping add to queue", procedure); 1068 } 1069 return true; 1070 1071 } finally { 1072 if (lockEntry != null) { 1073 procExecutionLock.releaseLockEntry(lockEntry); 1074 } 1075 } 1076 } 1077 1078 /** 1079 * Add a new root-procedure to the executor. 1080 * @param proc the new procedure to execute. 1081 * @param nonceKey the registered unique identifier for this operation from the client or process. 1082 * @return the procedure id, that can be used to monitor the operation 1083 */ 1084 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", 1085 justification = "FindBugs is blind to the check-for-null") 1086 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1087 Preconditions.checkArgument(lastProcId.get() >= 0); 1088 1089 prepareProcedure(proc); 1090 1091 final Long currentProcId; 1092 if (nonceKey != null) { 1093 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1094 Preconditions.checkArgument(currentProcId != null, 1095 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1096 } else { 1097 currentProcId = nextProcId(); 1098 } 1099 1100 // Initialize the procedure 1101 proc.setNonceKey(nonceKey); 1102 proc.setProcId(currentProcId.longValue()); 1103 1104 // Commit the transaction 1105 store.insert(proc, null); 1106 LOG.debug("Stored {}", proc); 1107 1108 // Add the procedure to the executor 1109 return pushProcedure(proc); 1110 } 1111 1112 /** 1113 * Add a set of new root-procedure to the executor. 1114 * @param procs the new procedures to execute. 1115 */ 1116 // TODO: Do we need to take nonces here? 1117 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1118 Preconditions.checkArgument(lastProcId.get() >= 0); 1119 if (procs == null || procs.length <= 0) { 1120 return; 1121 } 1122 1123 // Prepare procedure 1124 for (int i = 0; i < procs.length; ++i) { 1125 prepareProcedure(procs[i]).setProcId(nextProcId()); 1126 } 1127 1128 // Commit the transaction 1129 store.insert(procs); 1130 if (LOG.isDebugEnabled()) { 1131 LOG.debug("Stored " + Arrays.toString(procs)); 1132 } 1133 // Add the procedure to the executor 1134 for (Procedure<TEnvironment> proc : procs) { 1135 pushProcedure(proc); 1136 } 1137 } 1138 1139 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1140 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1141 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1142 if (this.checkOwnerSet) { 1143 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1144 } 1145 return proc; 1146 } 1147 1148 private long pushProcedure(Procedure<TEnvironment> proc) { 1149 long currentProcId = proc.getProcId(); 1150 // If we are going to upgrade to 2.2+, and this is not a sub procedure, do not push it to 1151 // scheduler. After we finish all the ongoing procedures, the master will quit. 1152 if (upgradeTo2_2 && !proc.hasParent()) { 1153 return currentProcId; 1154 } 1155 1156 // Update metrics on start of a procedure 1157 proc.updateMetricsOnSubmit(getEnvironment()); 1158 1159 // Create the rollback stack for the procedure 1160 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1161 rollbackStack.put(currentProcId, stack); 1162 1163 // Submit the new subprocedures 1164 assert !procedures.containsKey(currentProcId); 1165 procedures.put(currentProcId, proc); 1166 sendProcedureAddedNotification(currentProcId); 1167 scheduler.addBack(proc); 1168 return proc.getProcId(); 1169 } 1170 1171 /** 1172 * Send an abort notification the specified procedure. 1173 * Depending on the procedure implementation the abort can be considered or ignored. 1174 * @param procId the procedure to abort 1175 * @return true if the procedure exists and has received the abort, otherwise false. 1176 */ 1177 public boolean abort(long procId) { 1178 return abort(procId, true); 1179 } 1180 1181 /** 1182 * Send an abort notification to the specified procedure. 1183 * Depending on the procedure implementation, the abort can be considered or ignored. 1184 * @param procId the procedure to abort 1185 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1186 * @return true if the procedure exists and has received the abort, otherwise false. 1187 */ 1188 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1189 Procedure<TEnvironment> proc = procedures.get(procId); 1190 if (proc != null) { 1191 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1192 return false; 1193 } 1194 return proc.abort(getEnvironment()); 1195 } 1196 return false; 1197 } 1198 1199 // ========================================================================== 1200 // Executor query helpers 1201 // ========================================================================== 1202 public Procedure<TEnvironment> getProcedure(final long procId) { 1203 return procedures.get(procId); 1204 } 1205 1206 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1207 Procedure<TEnvironment> proc = getProcedure(procId); 1208 if (clazz.isInstance(proc)) { 1209 return clazz.cast(proc); 1210 } 1211 return null; 1212 } 1213 1214 public Procedure<TEnvironment> getResult(long procId) { 1215 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1216 if (retainer == null) { 1217 return null; 1218 } else { 1219 return retainer.getProcedure(); 1220 } 1221 } 1222 1223 /** 1224 * Return true if the procedure is finished. 1225 * The state may be "completed successfully" or "failed and rolledback". 1226 * Use getResult() to check the state or get the result data. 1227 * @param procId the ID of the procedure to check 1228 * @return true if the procedure execution is finished, otherwise false. 1229 */ 1230 public boolean isFinished(final long procId) { 1231 return !procedures.containsKey(procId); 1232 } 1233 1234 /** 1235 * Return true if the procedure is started. 1236 * @param procId the ID of the procedure to check 1237 * @return true if the procedure execution is started, otherwise false. 1238 */ 1239 public boolean isStarted(long procId) { 1240 Procedure<?> proc = procedures.get(procId); 1241 if (proc == null) { 1242 return completed.get(procId) != null; 1243 } 1244 return proc.wasExecuted(); 1245 } 1246 1247 /** 1248 * Mark the specified completed procedure, as ready to remove. 1249 * @param procId the ID of the procedure to remove 1250 */ 1251 public void removeResult(long procId) { 1252 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1253 if (retainer == null) { 1254 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1255 LOG.debug("pid={} already removed by the cleaner.", procId); 1256 return; 1257 } 1258 1259 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1260 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1261 } 1262 1263 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1264 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1265 if (retainer == null) { 1266 return procedures.get(procId); 1267 } else { 1268 return retainer.getProcedure(); 1269 } 1270 } 1271 1272 /** 1273 * Check if the user is this procedure's owner 1274 * @param procId the target procedure 1275 * @param user the user 1276 * @return true if the user is the owner of the procedure, 1277 * false otherwise or the owner is unknown. 1278 */ 1279 public boolean isProcedureOwner(long procId, User user) { 1280 if (user == null) { 1281 return false; 1282 } 1283 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1284 if (runningProc != null) { 1285 return runningProc.getOwner().equals(user.getShortName()); 1286 } 1287 1288 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1289 if (retainer != null) { 1290 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1291 } 1292 1293 // Procedure either does not exist or has already completed and got cleaned up. 1294 // At this time, we cannot check the owner of the procedure 1295 return false; 1296 } 1297 1298 1299 /** 1300 * Should only be used when starting up, where the procedure workers have not been started. 1301 * <p/> 1302 * If the procedure works has been started, the return values maybe changed when you are 1303 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1304 * it will do a copy, and also include the finished procedures. 1305 */ 1306 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1307 return procedures.values(); 1308 } 1309 1310 /** 1311 * Get procedures. 1312 * @return the procedures in a list 1313 */ 1314 public List<Procedure<TEnvironment>> getProcedures() { 1315 List<Procedure<TEnvironment>> procedureList = 1316 new ArrayList<>(procedures.size() + completed.size()); 1317 procedureList.addAll(procedures.values()); 1318 // Note: The procedure could show up twice in the list with different state, as 1319 // it could complete after we walk through procedures list and insert into 1320 // procedureList - it is ok, as we will use the information in the Procedure 1321 // to figure it out; to prevent this would increase the complexity of the logic. 1322 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1323 .forEach(procedureList::add); 1324 return procedureList; 1325 } 1326 1327 // ========================================================================== 1328 // Listeners helpers 1329 // ========================================================================== 1330 public void registerListener(ProcedureExecutorListener listener) { 1331 this.listeners.add(listener); 1332 } 1333 1334 public boolean unregisterListener(ProcedureExecutorListener listener) { 1335 return this.listeners.remove(listener); 1336 } 1337 1338 private void sendProcedureLoadedNotification(final long procId) { 1339 if (!this.listeners.isEmpty()) { 1340 for (ProcedureExecutorListener listener: this.listeners) { 1341 try { 1342 listener.procedureLoaded(procId); 1343 } catch (Throwable e) { 1344 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1345 } 1346 } 1347 } 1348 } 1349 1350 private void sendProcedureAddedNotification(final long procId) { 1351 if (!this.listeners.isEmpty()) { 1352 for (ProcedureExecutorListener listener: this.listeners) { 1353 try { 1354 listener.procedureAdded(procId); 1355 } catch (Throwable e) { 1356 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1357 } 1358 } 1359 } 1360 } 1361 1362 private void sendProcedureFinishedNotification(final long procId) { 1363 if (!this.listeners.isEmpty()) { 1364 for (ProcedureExecutorListener listener: this.listeners) { 1365 try { 1366 listener.procedureFinished(procId); 1367 } catch (Throwable e) { 1368 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1369 } 1370 } 1371 } 1372 } 1373 1374 // ========================================================================== 1375 // Procedure IDs helpers 1376 // ========================================================================== 1377 private long nextProcId() { 1378 long procId = lastProcId.incrementAndGet(); 1379 if (procId < 0) { 1380 while (!lastProcId.compareAndSet(procId, 0)) { 1381 procId = lastProcId.get(); 1382 if (procId >= 0) 1383 break; 1384 } 1385 while (procedures.containsKey(procId)) { 1386 procId = lastProcId.incrementAndGet(); 1387 } 1388 } 1389 assert procId >= 0 : "Invalid procId " + procId; 1390 return procId; 1391 } 1392 1393 @VisibleForTesting 1394 protected long getLastProcId() { 1395 return lastProcId.get(); 1396 } 1397 1398 @VisibleForTesting 1399 public Set<Long> getActiveProcIds() { 1400 return procedures.keySet(); 1401 } 1402 1403 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1404 return Procedure.getRootProcedureId(procedures, proc); 1405 } 1406 1407 // ========================================================================== 1408 // Executions 1409 // ========================================================================== 1410 private void executeProcedure(Procedure<TEnvironment> proc) { 1411 if (proc.isFinished()) { 1412 LOG.debug("{} is already finished, skipping execution", proc); 1413 return; 1414 } 1415 final Long rootProcId = getRootProcedureId(proc); 1416 if (rootProcId == null) { 1417 // The 'proc' was ready to run but the root procedure was rolledback 1418 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1419 executeRollback(proc); 1420 return; 1421 } 1422 1423 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1424 if (procStack == null) { 1425 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1426 return; 1427 } 1428 do { 1429 // Try to acquire the execution 1430 if (!procStack.acquire(proc)) { 1431 if (procStack.setRollback()) { 1432 // we have the 'rollback-lock' we can start rollingback 1433 switch (executeRollback(rootProcId, procStack)) { 1434 case LOCK_ACQUIRED: 1435 break; 1436 case LOCK_YIELD_WAIT: 1437 procStack.unsetRollback(); 1438 scheduler.yield(proc); 1439 break; 1440 case LOCK_EVENT_WAIT: 1441 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1442 procStack.unsetRollback(); 1443 break; 1444 default: 1445 throw new UnsupportedOperationException(); 1446 } 1447 } else { 1448 // if we can't rollback means that some child is still running. 1449 // the rollback will be executed after all the children are done. 1450 // If the procedure was never executed, remove and mark it as rolledback. 1451 if (!proc.wasExecuted()) { 1452 switch (executeRollback(proc)) { 1453 case LOCK_ACQUIRED: 1454 break; 1455 case LOCK_YIELD_WAIT: 1456 scheduler.yield(proc); 1457 break; 1458 case LOCK_EVENT_WAIT: 1459 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1460 break; 1461 default: 1462 throw new UnsupportedOperationException(); 1463 } 1464 } 1465 } 1466 break; 1467 } 1468 1469 // Execute the procedure 1470 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1471 // Note that lock is NOT about concurrency but rather about ensuring 1472 // ownership of a procedure of an entity such as a region or table 1473 LockState lockState = acquireLock(proc); 1474 switch (lockState) { 1475 case LOCK_ACQUIRED: 1476 execProcedure(procStack, proc); 1477 break; 1478 case LOCK_YIELD_WAIT: 1479 LOG.info(lockState + " " + proc); 1480 scheduler.yield(proc); 1481 break; 1482 case LOCK_EVENT_WAIT: 1483 // Someone will wake us up when the lock is available 1484 LOG.debug(lockState + " " + proc); 1485 break; 1486 default: 1487 throw new UnsupportedOperationException(); 1488 } 1489 procStack.release(proc); 1490 1491 if (proc.isSuccess()) { 1492 // update metrics on finishing the procedure. 1493 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1494 // Print out count of outstanding siblings if this procedure has a parent. 1495 Procedure<TEnvironment> parent = null; 1496 if (proc.hasParent()) { 1497 parent = procedures.get(proc.getParentProcId()); 1498 } 1499 LOG.info("Finished {} in {}{}", 1500 proc, 1501 StringUtils.humanTimeDiff(proc.elapsedTime()), 1502 parent != null? (", unfinishedSiblingCount=" + parent.getChildrenLatch()): ""); 1503 // Finalize the procedure state 1504 if (proc.getProcId() == rootProcId) { 1505 procedureFinished(proc); 1506 } else { 1507 execCompletionCleanup(proc); 1508 } 1509 break; 1510 } 1511 } while (procStack.isFailed()); 1512 } 1513 1514 private LockState acquireLock(Procedure<TEnvironment> proc) { 1515 TEnvironment env = getEnvironment(); 1516 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1517 // hasLock is true. 1518 if (proc.hasLock()) { 1519 return LockState.LOCK_ACQUIRED; 1520 } 1521 return proc.doAcquireLock(env, store); 1522 } 1523 1524 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1525 TEnvironment env = getEnvironment(); 1526 // For how the framework works, we know that we will always have the lock 1527 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1528 if (force || !proc.holdLock(env) || proc.isFinished()) { 1529 proc.doReleaseLock(env, store); 1530 } 1531 } 1532 1533 /** 1534 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1535 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1536 */ 1537 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1538 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1539 RemoteProcedureException exception = rootProc.getException(); 1540 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1541 // rolling back because the subprocedure does. Clarify. 1542 if (exception == null) { 1543 exception = procStack.getException(); 1544 rootProc.setFailure(exception); 1545 store.update(rootProc); 1546 } 1547 1548 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1549 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1550 1551 int stackTail = subprocStack.size(); 1552 while (stackTail-- > 0) { 1553 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1554 IdLock.Entry lockEntry = null; 1555 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1556 // this check, as the worker will hold the lock before executing a procedure. This is the only 1557 // place where we may hold two procedure execution locks, and there is a fence in the 1558 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1559 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1560 // prevent race between us and the force update thread. 1561 if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) { 1562 try { 1563 lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1564 } catch (IOException e) { 1565 // can only happen if interrupted, so not a big deal to propagate it 1566 throw new UncheckedIOException(e); 1567 } 1568 } 1569 try { 1570 // For the sub procedures which are successfully finished, we do not rollback them. 1571 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1572 // recursively rollback its ancestors. The state changes which are done by sub procedures 1573 // should be handled by parent procedures when rolling back. For example, when rolling back 1574 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1575 // online, instead of rolling back the original procedures which offlined the regions(in 1576 // fact these procedures can not be rolled back...). 1577 if (proc.isSuccess()) { 1578 // Just do the cleanup work, without actually executing the rollback 1579 subprocStack.remove(stackTail); 1580 cleanupAfterRollbackOneStep(proc); 1581 continue; 1582 } 1583 LockState lockState = acquireLock(proc); 1584 if (lockState != LockState.LOCK_ACQUIRED) { 1585 // can't take a lock on the procedure, add the root-proc back on the 1586 // queue waiting for the lock availability 1587 return lockState; 1588 } 1589 1590 lockState = executeRollback(proc); 1591 releaseLock(proc, false); 1592 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1593 abortRollback |= !isRunning() || !store.isRunning(); 1594 1595 // allows to kill the executor before something is stored to the wal. 1596 // useful to test the procedure recovery. 1597 if (abortRollback) { 1598 return lockState; 1599 } 1600 1601 subprocStack.remove(stackTail); 1602 1603 // if the procedure is kind enough to pass the slot to someone else, yield 1604 // if the proc is already finished, do not yield 1605 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1606 return LockState.LOCK_YIELD_WAIT; 1607 } 1608 1609 if (proc != rootProc) { 1610 execCompletionCleanup(proc); 1611 } 1612 } finally { 1613 if (lockEntry != null) { 1614 procExecutionLock.releaseLockEntry(lockEntry); 1615 } 1616 } 1617 } 1618 1619 // Finalize the procedure state 1620 LOG.info("Rolled back {} exec-time={}", rootProc, 1621 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1622 procedureFinished(rootProc); 1623 return LockState.LOCK_ACQUIRED; 1624 } 1625 1626 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1627 if (proc.removeStackIndex()) { 1628 if (!proc.isSuccess()) { 1629 proc.setState(ProcedureState.ROLLEDBACK); 1630 } 1631 1632 // update metrics on finishing the procedure (fail) 1633 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1634 1635 if (proc.hasParent()) { 1636 store.delete(proc.getProcId()); 1637 procedures.remove(proc.getProcId()); 1638 } else { 1639 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1640 if (childProcIds != null) { 1641 store.delete(proc, childProcIds); 1642 } else { 1643 store.update(proc); 1644 } 1645 } 1646 } else { 1647 store.update(proc); 1648 } 1649 } 1650 1651 /** 1652 * Execute the rollback of the procedure step. 1653 * It updates the store with the new state (stack index) 1654 * or will remove completly the procedure in case it is a child. 1655 */ 1656 private LockState executeRollback(Procedure<TEnvironment> proc) { 1657 try { 1658 proc.doRollback(getEnvironment()); 1659 } catch (IOException e) { 1660 LOG.debug("Roll back attempt failed for {}", proc, e); 1661 return LockState.LOCK_YIELD_WAIT; 1662 } catch (InterruptedException e) { 1663 handleInterruptedException(proc, e); 1664 return LockState.LOCK_YIELD_WAIT; 1665 } catch (Throwable e) { 1666 // Catch NullPointerExceptions or similar errors... 1667 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1668 } 1669 1670 // allows to kill the executor before something is stored to the wal. 1671 // useful to test the procedure recovery. 1672 if (testing != null && testing.shouldKillBeforeStoreUpdate()) { 1673 String msg = "TESTING: Kill before store update"; 1674 LOG.debug(msg); 1675 stop(); 1676 throw new RuntimeException(msg); 1677 } 1678 1679 cleanupAfterRollbackOneStep(proc); 1680 1681 return LockState.LOCK_ACQUIRED; 1682 } 1683 1684 private void yieldProcedure(Procedure<TEnvironment> proc) { 1685 releaseLock(proc, false); 1686 scheduler.yield(proc); 1687 } 1688 1689 /** 1690 * Executes <code>procedure</code> 1691 * <ul> 1692 * <li>Calls the doExecute() of the procedure 1693 * <li>If the procedure execution didn't fail (i.e. valid user input) 1694 * <ul> 1695 * <li>...and returned subprocedures 1696 * <ul><li>The subprocedures are initialized. 1697 * <li>The subprocedures are added to the store 1698 * <li>The subprocedures are added to the runnable queue 1699 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1700 * </ul> 1701 * </li> 1702 * <li>...if there are no subprocedure 1703 * <ul><li>the procedure completed successfully 1704 * <li>if there is a parent (WAITING) 1705 * <li>the parent state will be set to RUNNABLE 1706 * </ul> 1707 * </li> 1708 * </ul> 1709 * </li> 1710 * <li>In case of failure 1711 * <ul> 1712 * <li>The store is updated with the new state</li> 1713 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1714 * </ul> 1715 * </li> 1716 * </ul> 1717 */ 1718 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1719 Procedure<TEnvironment> procedure) { 1720 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1721 "NOT RUNNABLE! " + procedure.toString()); 1722 1723 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1724 // The exception is caught below and then we hurry to the exit without disturbing state. The 1725 // idea is that the processing of this procedure will be unsuspended later by an external event 1726 // such the report of a region open. 1727 boolean suspended = false; 1728 1729 // Whether to 're-' -execute; run through the loop again. 1730 boolean reExecute = false; 1731 1732 Procedure<TEnvironment>[] subprocs = null; 1733 do { 1734 reExecute = false; 1735 procedure.resetPersistence(); 1736 try { 1737 subprocs = procedure.doExecute(getEnvironment()); 1738 if (subprocs != null && subprocs.length == 0) { 1739 subprocs = null; 1740 } 1741 } catch (ProcedureSuspendedException e) { 1742 LOG.trace("Suspend {}", procedure); 1743 suspended = true; 1744 } catch (ProcedureYieldException e) { 1745 LOG.trace("Yield {}", procedure, e); 1746 yieldProcedure(procedure); 1747 return; 1748 } catch (InterruptedException e) { 1749 LOG.trace("Yield interrupt {}", procedure, e); 1750 handleInterruptedException(procedure, e); 1751 yieldProcedure(procedure); 1752 return; 1753 } catch (Throwable e) { 1754 // Catch NullPointerExceptions or similar errors... 1755 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1756 LOG.error(msg, e); 1757 procedure.setFailure(new RemoteProcedureException(msg, e)); 1758 } 1759 1760 if (!procedure.isFailed()) { 1761 if (subprocs != null) { 1762 if (subprocs.length == 1 && subprocs[0] == procedure) { 1763 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1764 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1765 subprocs = null; 1766 reExecute = true; 1767 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1768 } else { 1769 // Yield the current procedure, and make the subprocedure runnable 1770 // subprocs may come back 'null'. 1771 subprocs = initializeChildren(procStack, procedure, subprocs); 1772 LOG.info("Initialized subprocedures=" + 1773 (subprocs == null? null: 1774 Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). 1775 collect(Collectors.toList()).toString())); 1776 } 1777 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1778 LOG.trace("Added to timeoutExecutor {}", procedure); 1779 timeoutExecutor.add(procedure); 1780 } else if (!suspended) { 1781 // No subtask, so we are done 1782 procedure.setState(ProcedureState.SUCCESS); 1783 } 1784 } 1785 1786 // Add the procedure to the stack 1787 procStack.addRollbackStep(procedure); 1788 1789 // allows to kill the executor before something is stored to the wal. 1790 // useful to test the procedure recovery. 1791 if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) { 1792 kill("TESTING: Kill BEFORE store update: " + procedure); 1793 } 1794 1795 // TODO: The code here doesn't check if store is running before persisting to the store as 1796 // it relies on the method call below to throw RuntimeException to wind up the stack and 1797 // executor thread to stop. The statement following the method call below seems to check if 1798 // store is not running, to prevent scheduling children procedures, re-execution or yield 1799 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1800 // 1801 // Commit the transaction even if a suspend (state may have changed). Note this append 1802 // can take a bunch of time to complete. 1803 if (procedure.needPersistence()) { 1804 updateStoreOnExec(procStack, procedure, subprocs); 1805 } 1806 1807 // if the store is not running we are aborting 1808 if (!store.isRunning()) { 1809 return; 1810 } 1811 // if the procedure is kind enough to pass the slot to someone else, yield 1812 if (procedure.isRunnable() && !suspended && 1813 procedure.isYieldAfterExecutionStep(getEnvironment())) { 1814 yieldProcedure(procedure); 1815 return; 1816 } 1817 1818 assert (reExecute && subprocs == null) || !reExecute; 1819 } while (reExecute); 1820 1821 // Allows to kill the executor after something is stored to the WAL but before the below 1822 // state settings are done -- in particular the one on the end where we make parent 1823 // RUNNABLE again when its children are done; see countDownChildren. 1824 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1825 kill("TESTING: Kill AFTER store update: " + procedure); 1826 } 1827 1828 // Submit the new subprocedures 1829 if (subprocs != null && !procedure.isFailed()) { 1830 submitChildrenProcedures(subprocs); 1831 } 1832 1833 // we need to log the release lock operation before waking up the parent procedure, as there 1834 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1835 // the sub procedures from store and cause problems... 1836 releaseLock(procedure, false); 1837 1838 // if the procedure is complete and has a parent, count down the children latch. 1839 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1840 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1841 countDownChildren(procStack, procedure); 1842 } 1843 } 1844 1845 private void kill(String msg) { 1846 LOG.debug(msg); 1847 stop(); 1848 throw new RuntimeException(msg); 1849 } 1850 1851 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1852 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1853 assert subprocs != null : "expected subprocedures"; 1854 final long rootProcId = getRootProcedureId(procedure); 1855 for (int i = 0; i < subprocs.length; ++i) { 1856 Procedure<TEnvironment> subproc = subprocs[i]; 1857 if (subproc == null) { 1858 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1859 procedure.setFailure(new RemoteProcedureException(msg, 1860 new IllegalArgumentIOException(msg))); 1861 return null; 1862 } 1863 1864 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1865 subproc.setParentProcId(procedure.getProcId()); 1866 subproc.setRootProcId(rootProcId); 1867 subproc.setProcId(nextProcId()); 1868 procStack.addSubProcedure(subproc); 1869 } 1870 1871 if (!procedure.isFailed()) { 1872 procedure.setChildrenLatch(subprocs.length); 1873 switch (procedure.getState()) { 1874 case RUNNABLE: 1875 procedure.setState(ProcedureState.WAITING); 1876 break; 1877 case WAITING_TIMEOUT: 1878 timeoutExecutor.add(procedure); 1879 break; 1880 default: 1881 break; 1882 } 1883 } 1884 return subprocs; 1885 } 1886 1887 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1888 for (int i = 0; i < subprocs.length; ++i) { 1889 Procedure<TEnvironment> subproc = subprocs[i]; 1890 subproc.updateMetricsOnSubmit(getEnvironment()); 1891 assert !procedures.containsKey(subproc.getProcId()); 1892 procedures.put(subproc.getProcId(), subproc); 1893 scheduler.addFront(subproc); 1894 } 1895 } 1896 1897 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 1898 Procedure<TEnvironment> procedure) { 1899 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 1900 if (parent == null) { 1901 assert procStack.isRollingback(); 1902 return; 1903 } 1904 1905 // If this procedure is the last child awake the parent procedure 1906 if (parent.tryRunnable()) { 1907 // If we succeeded in making the parent runnable -- i.e. all of its 1908 // children have completed, move parent to front of the queue. 1909 store.update(parent); 1910 scheduler.addFront(parent); 1911 LOG.info("Finished subprocedure pid={}, resume processing parent {}", 1912 procedure.getProcId(), parent); 1913 return; 1914 } 1915 } 1916 1917 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 1918 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1919 if (subprocs != null && !procedure.isFailed()) { 1920 if (LOG.isTraceEnabled()) { 1921 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 1922 } 1923 store.insert(procedure, subprocs); 1924 } else { 1925 LOG.trace("Store update {}", procedure); 1926 if (procedure.isFinished() && !procedure.hasParent()) { 1927 // remove child procedures 1928 final long[] childProcIds = procStack.getSubprocedureIds(); 1929 if (childProcIds != null) { 1930 store.delete(procedure, childProcIds); 1931 for (int i = 0; i < childProcIds.length; ++i) { 1932 procedures.remove(childProcIds[i]); 1933 } 1934 } else { 1935 store.update(procedure); 1936 } 1937 } else { 1938 store.update(procedure); 1939 } 1940 } 1941 } 1942 1943 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 1944 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 1945 // NOTE: We don't call Thread.currentThread().interrupt() 1946 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 1947 // the InterruptedException. If the master is going down, we will be notified 1948 // and the executor/store will be stopped. 1949 // (The interrupted procedure will be retried on the next run) 1950 } 1951 1952 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 1953 final TEnvironment env = getEnvironment(); 1954 if (proc.hasLock()) { 1955 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" + 1956 " is finished, even if the holdLock is true, arrive here means we have some holes where" + 1957 " we do not release the lock. And the releaseLock below may fail since the procedure may" + 1958 " have already been deleted from the procedure store."); 1959 releaseLock(proc, true); 1960 } 1961 try { 1962 proc.completionCleanup(env); 1963 } catch (Throwable e) { 1964 // Catch NullPointerExceptions or similar errors... 1965 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); 1966 } 1967 } 1968 1969 private void procedureFinished(Procedure<TEnvironment> proc) { 1970 // call the procedure completion cleanup handler 1971 execCompletionCleanup(proc); 1972 1973 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 1974 1975 // update the executor internal state maps 1976 if (!proc.shouldWaitClientAck(getEnvironment())) { 1977 retainer.setClientAckTime(0); 1978 } 1979 1980 completed.put(proc.getProcId(), retainer); 1981 rollbackStack.remove(proc.getProcId()); 1982 procedures.remove(proc.getProcId()); 1983 1984 // call the runnableSet completion cleanup handler 1985 try { 1986 scheduler.completionCleanup(proc); 1987 } catch (Throwable e) { 1988 // Catch NullPointerExceptions or similar errors... 1989 LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); 1990 } 1991 1992 // Notify the listeners 1993 sendProcedureFinishedNotification(proc.getProcId()); 1994 } 1995 1996 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 1997 return rollbackStack.get(rootProcId); 1998 } 1999 2000 @VisibleForTesting 2001 ProcedureScheduler getProcedureScheduler() { 2002 return scheduler; 2003 } 2004 2005 @VisibleForTesting 2006 int getCompletedSize() { 2007 return completed.size(); 2008 } 2009 2010 // ========================================================================== 2011 // Worker Thread 2012 // ========================================================================== 2013 private class WorkerThread extends StoppableThread { 2014 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 2015 private volatile Procedure<TEnvironment> activeProcedure; 2016 private boolean onlyPollUrgent = false; 2017 public WorkerThread(ThreadGroup group) { 2018 this(group, "PEWorker-"); 2019 } 2020 2021 protected WorkerThread(ThreadGroup group, String prefix) { 2022 this(group, prefix, false); 2023 } 2024 2025 protected WorkerThread(ThreadGroup group, String prefix, boolean onlyPollUrgent) { 2026 super(group, prefix + workerId.incrementAndGet()); 2027 this.onlyPollUrgent = onlyPollUrgent; 2028 setDaemon(true); 2029 } 2030 2031 @Override 2032 public void sendStopSignal() { 2033 scheduler.signalAll(); 2034 } 2035 @Override 2036 public void run() { 2037 long lastUpdate = EnvironmentEdgeManager.currentTime(); 2038 try { 2039 while (isRunning() && keepAlive(lastUpdate)) { 2040 Procedure<TEnvironment> proc = scheduler 2041 .poll(onlyPollUrgent, keepAliveTime, TimeUnit.MILLISECONDS); 2042 if (proc == null) { 2043 continue; 2044 } 2045 this.activeProcedure = proc; 2046 int activeCount = activeExecutorCount.incrementAndGet(); 2047 int runningCount = store.setRunningProcedureCount(activeCount); 2048 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), 2049 runningCount, activeCount); 2050 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 2051 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 2052 try { 2053 executeProcedure(proc); 2054 } catch (AssertionError e) { 2055 LOG.info("ASSERT pid=" + proc.getProcId(), e); 2056 throw e; 2057 } finally { 2058 procExecutionLock.releaseLockEntry(lockEntry); 2059 activeCount = activeExecutorCount.decrementAndGet(); 2060 runningCount = store.setRunningProcedureCount(activeCount); 2061 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), 2062 runningCount, activeCount); 2063 this.activeProcedure = null; 2064 lastUpdate = EnvironmentEdgeManager.currentTime(); 2065 executionStartTime.set(Long.MAX_VALUE); 2066 } 2067 } 2068 } catch (Throwable t) { 2069 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 2070 } finally { 2071 LOG.trace("Worker terminated."); 2072 } 2073 if (onlyPollUrgent) { 2074 urgentWorkerThreads.remove(this); 2075 } else { 2076 workerThreads.remove(this); 2077 } 2078 } 2079 2080 @Override 2081 public String toString() { 2082 Procedure<?> p = this.activeProcedure; 2083 return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); 2084 } 2085 2086 /** 2087 * @return the time since the current procedure is running 2088 */ 2089 public long getCurrentRunTime() { 2090 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2091 } 2092 2093 // core worker never timeout 2094 protected boolean keepAlive(long lastUpdate) { 2095 return true; 2096 } 2097 } 2098 2099 // A worker thread which can be added when core workers are stuck. Will timeout after 2100 // keepAliveTime if there is no procedure to run. 2101 private final class KeepAliveWorkerThread extends WorkerThread { 2102 2103 public KeepAliveWorkerThread(ThreadGroup group) { 2104 super(group, "KeepAlivePEWorker-"); 2105 } 2106 2107 @Override 2108 protected boolean keepAlive(long lastUpdate) { 2109 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2110 } 2111 } 2112 2113 // ---------------------------------------------------------------------------- 2114 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2115 // full set of procedures pending and completed to write a compacted 2116 // version of the log (in case is a log)? 2117 // In theory no, procedures are have a short life, so at some point the store 2118 // will have the tracker saying everything is in the last log. 2119 // ---------------------------------------------------------------------------- 2120 2121 private final class WorkerMonitor extends InlineChore { 2122 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2123 "hbase.procedure.worker.monitor.interval.msec"; 2124 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2125 2126 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2127 "hbase.procedure.worker.stuck.threshold.msec"; 2128 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2129 2130 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2131 "hbase.procedure.worker.add.stuck.percentage"; 2132 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2133 2134 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2135 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2136 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2137 2138 public WorkerMonitor() { 2139 refreshConfig(); 2140 } 2141 2142 @Override 2143 public void run() { 2144 final int stuckCount = checkForStuckWorkers(); 2145 checkThreadCount(stuckCount); 2146 2147 // refresh interval (poor man dynamic conf update) 2148 refreshConfig(); 2149 } 2150 2151 private int checkForStuckWorkers() { 2152 // check if any of the worker is stuck 2153 int stuckCount = 0; 2154 for (WorkerThread worker : workerThreads) { 2155 if (worker.getCurrentRunTime() < stuckThreshold) { 2156 continue; 2157 } 2158 2159 // WARN the worker is stuck 2160 stuckCount++; 2161 LOG.warn("Worker stuck {}, run time {}", worker, 2162 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2163 } 2164 return stuckCount; 2165 } 2166 2167 private void checkThreadCount(final int stuckCount) { 2168 // nothing to do if there are no runnable tasks 2169 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2170 return; 2171 } 2172 2173 // add a new thread if the worker stuck percentage exceed the threshold limit 2174 // and every handler is active. 2175 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2176 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2177 // work to do. 2178 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2179 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2180 workerThreads.add(worker); 2181 worker.start(); 2182 LOG.debug("Added new worker thread {}", worker); 2183 } 2184 } 2185 2186 private void refreshConfig() { 2187 addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, 2188 DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2189 timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, 2190 DEFAULT_WORKER_MONITOR_INTERVAL); 2191 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, 2192 DEFAULT_WORKER_STUCK_THRESHOLD); 2193 } 2194 2195 @Override 2196 public int getTimeoutInterval() { 2197 return timeoutInterval; 2198 } 2199 } 2200}