001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2; 019 020import java.io.IOException; 021import java.io.UncheckedIOException; 022import java.util.ArrayDeque; 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Collection; 026import java.util.Deque; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.CopyOnWriteArrayList; 032import java.util.concurrent.Executor; 033import java.util.concurrent.Executors; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.atomic.AtomicBoolean; 036import java.util.concurrent.atomic.AtomicInteger; 037import java.util.concurrent.atomic.AtomicLong; 038import java.util.stream.Collectors; 039import java.util.stream.Stream; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.hbase.HConstants; 042import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; 043import org.apache.hadoop.hbase.log.HBaseMarkers; 044import org.apache.hadoop.hbase.procedure2.Procedure.LockState; 045import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; 046import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; 047import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureStoreListener; 048import org.apache.hadoop.hbase.procedure2.util.StringUtils; 049import org.apache.hadoop.hbase.security.User; 050import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 051import org.apache.hadoop.hbase.util.IdLock; 052import org.apache.hadoop.hbase.util.NonceKey; 053import org.apache.hadoop.hbase.util.Threads; 054import org.apache.yetus.audience.InterfaceAudience; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 059import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; 060import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 061 062import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; 063 064/** 065 * Thread Pool that executes the submitted procedures. 066 * The executor has a ProcedureStore associated. 067 * Each operation is logged and on restart the pending procedures are resumed. 068 * 069 * Unless the Procedure code throws an error (e.g. invalid user input) 070 * the procedure will complete (at some point in time), On restart the pending 071 * procedures are resumed and the once failed will be rolledback. 072 * 073 * The user can add procedures to the executor via submitProcedure(proc) 074 * check for the finished state via isFinished(procId) 075 * and get the result via getResult(procId) 076 */ 077@InterfaceAudience.Private 078public class ProcedureExecutor<TEnvironment> { 079 private static final Logger LOG = LoggerFactory.getLogger(ProcedureExecutor.class); 080 081 public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; 082 private static final boolean DEFAULT_CHECK_OWNER_SET = false; 083 084 public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = 085 "hbase.procedure.worker.keep.alive.time.msec"; 086 private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = TimeUnit.MINUTES.toMillis(1); 087 088 // Enable this flag if you want to upgrade to 2.2+, there are some incompatible changes on how we 089 // assign or unassign a region, so we need to make sure all these procedures have been finished 090 // before we start the master with new code. See HBASE-20881 and HBASE-21075 for more details. 091 public static final String UPGRADE_TO_2_2 = "hbase.procedure.upgrade-to-2-2"; 092 private static final boolean DEFAULT_UPGRADE_TO_2_2 = false; 093 094 public static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; 095 static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min 096 097 public static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; 098 static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min 099 100 /** 101 * {@link #testing} is non-null when ProcedureExecutor is being tested. Tests will try to 102 * break PE having it fail at various junctures. When non-null, testing is set to an instance of 103 * the below internal {@link Testing} class with flags set for the particular test. 104 */ 105 Testing testing = null; 106 107 /** 108 * Class with parameters describing how to fail/die when in testing-context. 109 */ 110 public static class Testing { 111 protected boolean killIfSuspended = false; 112 113 /** 114 * Kill the PE BEFORE we store state to the WAL. Good for figuring out if a Procedure is 115 * persisting all the state it needs to recover after a crash. 116 */ 117 protected boolean killBeforeStoreUpdate = false; 118 protected boolean toggleKillBeforeStoreUpdate = false; 119 120 /** 121 * Set when we want to fail AFTER state has been stored into the WAL. Rarely used. HBASE-20978 122 * is about a case where memory-state was being set after store to WAL where a crash could 123 * cause us to get stuck. This flag allows killing at what was a vulnerable time. 124 */ 125 protected boolean killAfterStoreUpdate = false; 126 protected boolean toggleKillAfterStoreUpdate = false; 127 128 protected boolean shouldKillBeforeStoreUpdate() { 129 final boolean kill = this.killBeforeStoreUpdate; 130 if (this.toggleKillBeforeStoreUpdate) { 131 this.killBeforeStoreUpdate = !kill; 132 LOG.warn("Toggle KILL before store update to: " + this.killBeforeStoreUpdate); 133 } 134 return kill; 135 } 136 137 protected boolean shouldKillBeforeStoreUpdate(final boolean isSuspended) { 138 return (isSuspended && !killIfSuspended) ? false : shouldKillBeforeStoreUpdate(); 139 } 140 141 protected boolean shouldKillAfterStoreUpdate() { 142 final boolean kill = this.killAfterStoreUpdate; 143 if (this.toggleKillAfterStoreUpdate) { 144 this.killAfterStoreUpdate = !kill; 145 LOG.warn("Toggle KILL after store update to: " + this.killAfterStoreUpdate); 146 } 147 return kill; 148 } 149 150 protected boolean shouldKillAfterStoreUpdate(final boolean isSuspended) { 151 return (isSuspended && !killIfSuspended) ? false : shouldKillAfterStoreUpdate(); 152 } 153 } 154 155 public interface ProcedureExecutorListener { 156 void procedureLoaded(long procId); 157 void procedureAdded(long procId); 158 void procedureFinished(long procId); 159 } 160 161 /** 162 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the Procedure. 163 * Once a Root-Procedure completes (success or failure), the result will be added to this map. 164 * The user of ProcedureExecutor should call getResult(procId) to get the result. 165 */ 166 private final ConcurrentHashMap<Long, CompletedProcedureRetainer<TEnvironment>> completed = 167 new ConcurrentHashMap<>(); 168 169 /** 170 * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. 171 * The RootProcedureState contains the execution stack of the Root-Procedure, 172 * It is added to the map by submitProcedure() and removed on procedure completion. 173 */ 174 private final ConcurrentHashMap<Long, RootProcedureState<TEnvironment>> rollbackStack = 175 new ConcurrentHashMap<>(); 176 177 /** 178 * Helper map to lookup the live procedures by ID. 179 * This map contains every procedure. root-procedures and subprocedures. 180 */ 181 private final ConcurrentHashMap<Long, Procedure<TEnvironment>> procedures = 182 new ConcurrentHashMap<>(); 183 184 /** 185 * Helper map to lookup whether the procedure already issued from the same client. This map 186 * contains every root procedure. 187 */ 188 private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<>(); 189 190 private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = 191 new CopyOnWriteArrayList<>(); 192 193 private Configuration conf; 194 195 /** 196 * Created in the {@link #init(int, boolean)} method. Destroyed in {@link #join()} (FIX! Doing 197 * resource handling rather than observing in a #join is unexpected). 198 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 199 * (Should be ok). 200 */ 201 private ThreadGroup threadGroup; 202 203 /** 204 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 205 * resource handling rather than observing in a #join is unexpected). 206 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 207 * (Should be ok). 208 */ 209 private CopyOnWriteArrayList<WorkerThread> workerThreads; 210 211 /** 212 * Worker thread only for urgent tasks. 213 */ 214 private CopyOnWriteArrayList<WorkerThread> urgentWorkerThreads; 215 216 /** 217 * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing 218 * resource handling rather than observing in a #join is unexpected). 219 * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery 220 * (Should be ok). 221 */ 222 private TimeoutExecutorThread<TEnvironment> timeoutExecutor; 223 224 private int corePoolSize; 225 private int maxPoolSize; 226 private int urgentPoolSize; 227 228 private volatile long keepAliveTime; 229 230 /** 231 * Scheduler/Queue that contains runnable procedures. 232 */ 233 private final ProcedureScheduler scheduler; 234 235 private final Executor forceUpdateExecutor = Executors.newSingleThreadExecutor( 236 new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Force-Update-PEWorker-%d").build()); 237 238 private final AtomicLong lastProcId = new AtomicLong(-1); 239 private final AtomicLong workerId = new AtomicLong(0); 240 private final AtomicInteger activeExecutorCount = new AtomicInteger(0); 241 private final AtomicBoolean running = new AtomicBoolean(false); 242 private final TEnvironment environment; 243 private final ProcedureStore store; 244 245 private final boolean checkOwnerSet; 246 247 // To prevent concurrent execution of the same procedure. 248 // For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the 249 // procedure is woken up before we finish the suspend which causes the same procedures to be 250 // executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also 251 // a bit confusing to the developers. So here we introduce this lock to prevent the concurrent 252 // execution of the same procedure. 253 private final IdLock procExecutionLock = new IdLock(); 254 255 private final boolean upgradeTo2_2; 256 257 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 258 final ProcedureStore store) { 259 this(conf, environment, store, new SimpleProcedureScheduler()); 260 } 261 262 private boolean isRootFinished(Procedure<?> proc) { 263 Procedure<?> rootProc = procedures.get(proc.getRootProcId()); 264 return rootProc == null || rootProc.isFinished(); 265 } 266 267 private void forceUpdateProcedure(long procId) throws IOException { 268 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(procId); 269 try { 270 Procedure<TEnvironment> proc = procedures.get(procId); 271 if (proc != null) { 272 if (proc.isFinished() && proc.hasParent() && isRootFinished(proc)) { 273 LOG.debug("Procedure {} has already been finished and parent is succeeded," + 274 " skip force updating", proc); 275 return; 276 } 277 } else { 278 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 279 if (retainer == null || retainer.getProcedure() instanceof FailedProcedure) { 280 LOG.debug("No pending procedure with id = {}, skip force updating.", procId); 281 return; 282 } 283 long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); 284 long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); 285 if (retainer.isExpired(System.currentTimeMillis(), evictTtl, evictAckTtl)) { 286 LOG.debug("Procedure {} has already been finished and expired, skip force updating", 287 procId); 288 return; 289 } 290 proc = retainer.getProcedure(); 291 } 292 LOG.debug("Force update procedure {}", proc); 293 store.update(proc); 294 } finally { 295 procExecutionLock.releaseLockEntry(lockEntry); 296 } 297 } 298 299 public ProcedureExecutor(final Configuration conf, final TEnvironment environment, 300 final ProcedureStore store, final ProcedureScheduler scheduler) { 301 this.environment = environment; 302 this.scheduler = scheduler; 303 this.store = store; 304 this.conf = conf; 305 this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); 306 this.upgradeTo2_2 = conf.getBoolean(UPGRADE_TO_2_2, DEFAULT_UPGRADE_TO_2_2); 307 refreshConfiguration(conf); 308 store.registerListener(new ProcedureStoreListener() { 309 310 @Override 311 public void forceUpdate(long[] procIds) { 312 Arrays.stream(procIds).forEach(procId -> forceUpdateExecutor.execute(() -> { 313 try { 314 forceUpdateProcedure(procId); 315 } catch (IOException e) { 316 LOG.warn("Failed to force update procedure with pid={}", procId); 317 } 318 })); 319 } 320 }); 321 } 322 323 private void load(final boolean abortOnCorruption) throws IOException { 324 Preconditions.checkArgument(completed.isEmpty(), "completed not empty"); 325 Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty"); 326 Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty"); 327 Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty"); 328 329 store.load(new ProcedureStore.ProcedureLoader() { 330 @Override 331 public void setMaxProcId(long maxProcId) { 332 assert lastProcId.get() < 0 : "expected only one call to setMaxProcId()"; 333 lastProcId.set(maxProcId); 334 } 335 336 @Override 337 public void load(ProcedureIterator procIter) throws IOException { 338 loadProcedures(procIter, abortOnCorruption); 339 } 340 341 @Override 342 public void handleCorrupted(ProcedureIterator procIter) throws IOException { 343 int corruptedCount = 0; 344 while (procIter.hasNext()) { 345 Procedure<?> proc = procIter.next(); 346 LOG.error("Corrupt " + proc); 347 corruptedCount++; 348 } 349 if (abortOnCorruption && corruptedCount > 0) { 350 throw new IOException("found " + corruptedCount + " corrupted procedure(s) on replay"); 351 } 352 } 353 }); 354 } 355 356 private void restoreLock(Procedure<TEnvironment> proc, Set<Long> restored) { 357 proc.restoreLock(getEnvironment()); 358 restored.add(proc.getProcId()); 359 } 360 361 private void restoreLocks(Deque<Procedure<TEnvironment>> stack, Set<Long> restored) { 362 while (!stack.isEmpty()) { 363 restoreLock(stack.pop(), restored); 364 } 365 } 366 367 // Restore the locks for all the procedures. 368 // Notice that we need to restore the locks starting from the root proc, otherwise there will be 369 // problem that a sub procedure may hold the exclusive lock first and then we are stuck when 370 // calling the acquireLock method for the parent procedure. 371 // The algorithm is straight-forward: 372 // 1. Use a set to record the procedures which locks have already been restored. 373 // 2. Use a stack to store the hierarchy of the procedures 374 // 3. For all the procedure, we will first try to find its parent and push it into the stack, 375 // unless 376 // a. We have no parent, i.e, we are the root procedure 377 // b. The lock has already been restored(by checking the set introduced in #1) 378 // then we start to pop the stack and call acquireLock for each procedure. 379 // Notice that this should be done for all procedures, not only the ones in runnableList. 380 private void restoreLocks() { 381 Set<Long> restored = new HashSet<>(); 382 Deque<Procedure<TEnvironment>> stack = new ArrayDeque<>(); 383 procedures.values().forEach(proc -> { 384 for (;;) { 385 if (restored.contains(proc.getProcId())) { 386 restoreLocks(stack, restored); 387 return; 388 } 389 if (!proc.hasParent()) { 390 restoreLock(proc, restored); 391 restoreLocks(stack, restored); 392 return; 393 } 394 stack.push(proc); 395 proc = procedures.get(proc.getParentProcId()); 396 } 397 }); 398 } 399 400 private void loadProcedures(ProcedureIterator procIter, boolean abortOnCorruption) 401 throws IOException { 402 // 1. Build the rollback stack 403 int runnableCount = 0; 404 int failedCount = 0; 405 int waitingCount = 0; 406 int waitingTimeoutCount = 0; 407 while (procIter.hasNext()) { 408 boolean finished = procIter.isNextFinished(); 409 Procedure<TEnvironment> proc = procIter.next(); 410 NonceKey nonceKey = proc.getNonceKey(); 411 long procId = proc.getProcId(); 412 413 if (finished) { 414 completed.put(proc.getProcId(), new CompletedProcedureRetainer<>(proc)); 415 LOG.debug("Completed {}", proc); 416 } else { 417 if (!proc.hasParent()) { 418 assert !proc.isFinished() : "unexpected finished procedure"; 419 rollbackStack.put(proc.getProcId(), new RootProcedureState<>()); 420 } 421 422 // add the procedure to the map 423 proc.beforeReplay(getEnvironment()); 424 procedures.put(proc.getProcId(), proc); 425 switch (proc.getState()) { 426 case RUNNABLE: 427 runnableCount++; 428 break; 429 case FAILED: 430 failedCount++; 431 break; 432 case WAITING: 433 waitingCount++; 434 break; 435 case WAITING_TIMEOUT: 436 waitingTimeoutCount++; 437 break; 438 default: 439 break; 440 } 441 } 442 443 // add the nonce to the map 444 if (nonceKey != null) { 445 nonceKeysToProcIdsMap.put(nonceKey, procId); 446 } 447 } 448 449 // 2. Initialize the stacks 450 // In the old implementation, for procedures in FAILED state, we will push it into the 451 // ProcedureScheduler directly to execute the rollback. But this does not work after we 452 // introduce the restore lock stage. 453 // For now, when we acquire a xlock, we will remove the queue from runQueue in scheduler, and 454 // then when a procedure which has lock access, for example, a sub procedure of the procedure 455 // which has the xlock, is pushed into the scheduler, we will add the queue back to let the 456 // workers poll from it. The assumption here is that, the procedure which has the xlock should 457 // have been polled out already, so when loading we can not add the procedure to scheduler first 458 // and then call acquireLock, since the procedure is still in the queue, and since we will 459 // remove the queue from runQueue, then no one can poll it out, then there is a dead lock 460 List<Procedure<TEnvironment>> runnableList = new ArrayList<>(runnableCount); 461 List<Procedure<TEnvironment>> failedList = new ArrayList<>(failedCount); 462 List<Procedure<TEnvironment>> waitingList = new ArrayList<>(waitingCount); 463 List<Procedure<TEnvironment>> waitingTimeoutList = new ArrayList<>(waitingTimeoutCount); 464 procIter.reset(); 465 while (procIter.hasNext()) { 466 if (procIter.isNextFinished()) { 467 procIter.skipNext(); 468 continue; 469 } 470 471 Procedure<TEnvironment> proc = procIter.next(); 472 assert !(proc.isFinished() && !proc.hasParent()) : "unexpected completed proc=" + proc; 473 474 LOG.debug("Loading {}", proc); 475 476 Long rootProcId = getRootProcedureId(proc); 477 // The orphan procedures will be passed to handleCorrupted, so add an assert here 478 assert rootProcId != null; 479 480 if (proc.hasParent()) { 481 Procedure<TEnvironment> parent = procedures.get(proc.getParentProcId()); 482 if (parent != null && !proc.isFinished()) { 483 parent.incChildrenLatch(); 484 } 485 } 486 487 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 488 procStack.loadStack(proc); 489 490 proc.setRootProcId(rootProcId); 491 switch (proc.getState()) { 492 case RUNNABLE: 493 runnableList.add(proc); 494 break; 495 case WAITING: 496 waitingList.add(proc); 497 break; 498 case WAITING_TIMEOUT: 499 waitingTimeoutList.add(proc); 500 break; 501 case FAILED: 502 failedList.add(proc); 503 break; 504 case ROLLEDBACK: 505 case INITIALIZING: 506 String msg = "Unexpected " + proc.getState() + " state for " + proc; 507 LOG.error(msg); 508 throw new UnsupportedOperationException(msg); 509 default: 510 break; 511 } 512 } 513 514 // 3. Check the waiting procedures to see if some of them can be added to runnable. 515 waitingList.forEach(proc -> { 516 if (!proc.hasChildren()) { 517 // Normally, WAITING procedures should be waken by its children. 518 // But, there is a case that, all the children are successful and before 519 // they can wake up their parent procedure, the master was killed. 520 // So, during recovering the procedures from ProcedureWal, its children 521 // are not loaded because of their SUCCESS state. 522 // So we need to continue to run this WAITING procedure. But before 523 // executing, we need to set its state to RUNNABLE, otherwise, a exception 524 // will throw: 525 // Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 526 // "NOT RUNNABLE! " + procedure.toString()); 527 proc.setState(ProcedureState.RUNNABLE); 528 runnableList.add(proc); 529 } else { 530 proc.afterReplay(getEnvironment()); 531 } 532 }); 533 // 4. restore locks 534 restoreLocks(); 535 536 // 5. Push the procedures to the timeout executor 537 waitingTimeoutList.forEach(proc -> { 538 proc.afterReplay(getEnvironment()); 539 timeoutExecutor.add(proc); 540 }); 541 542 // 6. Push the procedure to the scheduler 543 failedList.forEach(scheduler::addBack); 544 runnableList.forEach(p -> { 545 p.afterReplay(getEnvironment()); 546 if (!p.hasParent()) { 547 sendProcedureLoadedNotification(p.getProcId()); 548 } 549 scheduler.addBack(p); 550 }); 551 // After all procedures put into the queue, signal the worker threads. 552 // Otherwise, there is a race condition. See HBASE-21364. 553 scheduler.signalAll(); 554 } 555 556 /** 557 * Initialize the procedure executor, but do not start workers. We will start them later. 558 * <p/> 559 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 560 * ensure a single executor, and start the procedure replay to resume and recover the previous 561 * pending and in-progress procedures. 562 * @param numThreads number of threads available for procedure execution. 563 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 564 * is found on replay. otherwise false. 565 */ 566 public void init(int numThreads, boolean abortOnCorruption) throws IOException { 567 init(numThreads, 0, abortOnCorruption); 568 } 569 570 /** 571 * Initialize the procedure executor, but do not start workers. We will start them later. 572 * <p/> 573 * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and 574 * ensure a single executor, and start the procedure replay to resume and recover the previous 575 * pending and in-progress procedures. 576 * @param numThreads number of threads available for procedure execution. 577 * @param urgentNumThreads number of threads available for urgent procedure execution. 578 * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure 579 * is found on replay. otherwise false. 580 */ 581 public void init(int numThreads, int urgentNumThreads, 582 boolean abortOnCorruption) throws IOException { 583 // We have numThreads executor + one timer thread used for timing out 584 // procedures and triggering periodic procedures. 585 this.corePoolSize = numThreads; 586 this.maxPoolSize = 10 * numThreads; 587 this.urgentPoolSize = urgentNumThreads; 588 LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker " 589 + "count={}, start {} urgent thread(s)", 590 corePoolSize, maxPoolSize, urgentPoolSize); 591 592 this.threadGroup = new ThreadGroup("PEWorkerGroup"); 593 this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup); 594 595 // Create the workers 596 workerId.set(0); 597 workerThreads = new CopyOnWriteArrayList<>(); 598 urgentWorkerThreads = new CopyOnWriteArrayList<>(); 599 for (int i = 0; i < corePoolSize; ++i) { 600 workerThreads.add(new WorkerThread(threadGroup)); 601 } 602 for (int i = 0; i < urgentNumThreads; ++i) { 603 urgentWorkerThreads 604 .add(new WorkerThread(threadGroup, "UrgentPEWorker-", true)); 605 } 606 607 long st, et; 608 609 // Acquire the store lease. 610 st = System.nanoTime(); 611 store.recoverLease(); 612 et = System.nanoTime(); 613 LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), 614 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 615 616 // start the procedure scheduler 617 scheduler.start(); 618 619 // TODO: Split in two steps. 620 // TODO: Handle corrupted procedures (currently just a warn) 621 // The first one will make sure that we have the latest id, 622 // so we can start the threads and accept new procedures. 623 // The second step will do the actual load of old procedures. 624 st = System.nanoTime(); 625 load(abortOnCorruption); 626 et = System.nanoTime(); 627 LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), 628 StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); 629 } 630 631 /** 632 * Start the workers. 633 */ 634 public void startWorkers() throws IOException { 635 if (!running.compareAndSet(false, true)) { 636 LOG.warn("Already running"); 637 return; 638 } 639 // Start the executors. Here we must have the lastProcId set. 640 LOG.debug("Start workers {}, urgent workers {}", workerThreads.size(), 641 urgentWorkerThreads.size()); 642 timeoutExecutor.start(); 643 for (WorkerThread worker: workerThreads) { 644 worker.start(); 645 } 646 647 for (WorkerThread worker: urgentWorkerThreads) { 648 worker.start(); 649 } 650 651 // Internal chores 652 timeoutExecutor.add(new WorkerMonitor()); 653 654 if (upgradeTo2_2) { 655 timeoutExecutor.add(new InlineChore() { 656 657 @Override 658 public void run() { 659 if (procedures.isEmpty()) { 660 LOG.info("UPGRADE OK: All existed procedures have been finished, quit..."); 661 System.exit(0); 662 } 663 } 664 665 @Override 666 public int getTimeoutInterval() { 667 // check every 5 seconds to see if we can quit 668 return 5000; 669 } 670 }); 671 } 672 673 // Add completed cleaner chore 674 addChore(new CompletedProcedureCleaner<>(conf, store, procExecutionLock, completed, 675 nonceKeysToProcIdsMap)); 676 } 677 678 public void stop() { 679 if (!running.getAndSet(false)) { 680 return; 681 } 682 683 LOG.info("Stopping"); 684 scheduler.stop(); 685 timeoutExecutor.sendStopSignal(); 686 } 687 688 @VisibleForTesting 689 public void join() { 690 assert !isRunning() : "expected not running"; 691 692 // stop the timeout executor 693 timeoutExecutor.awaitTermination(); 694 695 // stop the worker threads 696 for (WorkerThread worker: workerThreads) { 697 worker.awaitTermination(); 698 } 699 700 // stop the worker threads 701 for (WorkerThread worker: urgentWorkerThreads) { 702 worker.awaitTermination(); 703 } 704 705 // Destroy the Thread Group for the executors 706 // TODO: Fix. #join is not place to destroy resources. 707 try { 708 threadGroup.destroy(); 709 } catch (IllegalThreadStateException e) { 710 LOG.error("ThreadGroup {} contains running threads; {}: See STDOUT", 711 this.threadGroup, e.getMessage()); 712 // This dumps list of threads on STDOUT. 713 this.threadGroup.list(); 714 } 715 716 // reset the in-memory state for testing 717 completed.clear(); 718 rollbackStack.clear(); 719 procedures.clear(); 720 nonceKeysToProcIdsMap.clear(); 721 scheduler.clear(); 722 lastProcId.set(-1); 723 } 724 725 public void refreshConfiguration(final Configuration conf) { 726 this.conf = conf; 727 setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, 728 DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS); 729 } 730 731 // ========================================================================== 732 // Accessors 733 // ========================================================================== 734 public boolean isRunning() { 735 return running.get(); 736 } 737 738 /** 739 * @return the current number of worker threads. 740 */ 741 public int getWorkerThreadCount() { 742 return workerThreads.size() + urgentWorkerThreads.size(); 743 } 744 745 /** 746 * @return the core pool size settings. 747 */ 748 public int getCorePoolSize() { 749 return corePoolSize; 750 } 751 752 public int getUrgentPoolSize() { 753 return urgentPoolSize; 754 } 755 756 public int getActiveExecutorCount() { 757 return activeExecutorCount.get(); 758 } 759 760 public TEnvironment getEnvironment() { 761 return this.environment; 762 } 763 764 public ProcedureStore getStore() { 765 return this.store; 766 } 767 768 ProcedureScheduler getScheduler() { 769 return scheduler; 770 } 771 772 public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { 773 this.keepAliveTime = timeUnit.toMillis(keepAliveTime); 774 this.scheduler.signalAll(); 775 } 776 777 public long getKeepAliveTime(final TimeUnit timeUnit) { 778 return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); 779 } 780 781 // ========================================================================== 782 // Submit/Remove Chores 783 // ========================================================================== 784 785 /** 786 * Add a chore procedure to the executor 787 * @param chore the chore to add 788 */ 789 public void addChore(ProcedureInMemoryChore<TEnvironment> chore) { 790 chore.setState(ProcedureState.WAITING_TIMEOUT); 791 timeoutExecutor.add(chore); 792 } 793 794 /** 795 * Remove a chore procedure from the executor 796 * @param chore the chore to remove 797 * @return whether the chore is removed, or it will be removed later 798 */ 799 public boolean removeChore(ProcedureInMemoryChore<TEnvironment> chore) { 800 chore.setState(ProcedureState.SUCCESS); 801 return timeoutExecutor.remove(chore); 802 } 803 804 // ========================================================================== 805 // Nonce Procedure helpers 806 // ========================================================================== 807 /** 808 * Create a NoneKey from the specified nonceGroup and nonce. 809 * @param nonceGroup 810 * @param nonce 811 * @return the generated NonceKey 812 */ 813 public NonceKey createNonceKey(final long nonceGroup, final long nonce) { 814 return (nonce == HConstants.NO_NONCE) ? null : new NonceKey(nonceGroup, nonce); 815 } 816 817 /** 818 * Register a nonce for a procedure that is going to be submitted. 819 * A procId will be reserved and on submitProcedure(), 820 * the procedure with the specified nonce will take the reserved ProcId. 821 * If someone already reserved the nonce, this method will return the procId reserved, 822 * otherwise an invalid procId will be returned. and the caller should procede 823 * and submit the procedure. 824 * 825 * @param nonceKey A unique identifier for this operation from the client or process. 826 * @return the procId associated with the nonce, if any otherwise an invalid procId. 827 */ 828 public long registerNonce(final NonceKey nonceKey) { 829 if (nonceKey == null) return -1; 830 831 // check if we have already a Reserved ID for the nonce 832 Long oldProcId = nonceKeysToProcIdsMap.get(nonceKey); 833 if (oldProcId == null) { 834 // reserve a new Procedure ID, this will be associated with the nonce 835 // and the procedure submitted with the specified nonce will use this ID. 836 final long newProcId = nextProcId(); 837 oldProcId = nonceKeysToProcIdsMap.putIfAbsent(nonceKey, newProcId); 838 if (oldProcId == null) return -1; 839 } 840 841 // we found a registered nonce, but the procedure may not have been submitted yet. 842 // since the client expect the procedure to be submitted, spin here until it is. 843 final boolean traceEnabled = LOG.isTraceEnabled(); 844 while (isRunning() && 845 !(procedures.containsKey(oldProcId) || completed.containsKey(oldProcId)) && 846 nonceKeysToProcIdsMap.containsKey(nonceKey)) { 847 if (traceEnabled) { 848 LOG.trace("Waiting for pid=" + oldProcId.longValue() + " to be submitted"); 849 } 850 Threads.sleep(100); 851 } 852 return oldProcId.longValue(); 853 } 854 855 /** 856 * Remove the NonceKey if the procedure was not submitted to the executor. 857 * @param nonceKey A unique identifier for this operation from the client or process. 858 */ 859 public void unregisterNonceIfProcedureWasNotSubmitted(final NonceKey nonceKey) { 860 if (nonceKey == null) return; 861 862 final Long procId = nonceKeysToProcIdsMap.get(nonceKey); 863 if (procId == null) return; 864 865 // if the procedure was not submitted, remove the nonce 866 if (!(procedures.containsKey(procId) || completed.containsKey(procId))) { 867 nonceKeysToProcIdsMap.remove(nonceKey); 868 } 869 } 870 871 /** 872 * If the failure failed before submitting it, we may want to give back the 873 * same error to the requests with the same nonceKey. 874 * 875 * @param nonceKey A unique identifier for this operation from the client or process 876 * @param procName name of the procedure, used to inform the user 877 * @param procOwner name of the owner of the procedure, used to inform the user 878 * @param exception the failure to report to the user 879 */ 880 public void setFailureResultForNonce(NonceKey nonceKey, String procName, User procOwner, 881 IOException exception) { 882 if (nonceKey == null) { 883 return; 884 } 885 886 Long procId = nonceKeysToProcIdsMap.get(nonceKey); 887 if (procId == null || completed.containsKey(procId)) { 888 return; 889 } 890 891 Procedure<TEnvironment> proc = 892 new FailedProcedure<>(procId.longValue(), procName, procOwner, nonceKey, exception); 893 894 completed.putIfAbsent(procId, new CompletedProcedureRetainer<>(proc)); 895 } 896 897 // ========================================================================== 898 // Submit/Abort Procedure 899 // ========================================================================== 900 /** 901 * Add a new root-procedure to the executor. 902 * @param proc the new procedure to execute. 903 * @return the procedure id, that can be used to monitor the operation 904 */ 905 public long submitProcedure(Procedure<TEnvironment> proc) { 906 return submitProcedure(proc, null); 907 } 908 909 /** 910 * Bypass a procedure. If the procedure is set to bypass, all the logic in 911 * execute/rollback will be ignored and it will return success, whatever. 912 * It is used to recover buggy stuck procedures, releasing the lock resources 913 * and letting other procedures run. Bypassing one procedure (and its ancestors will 914 * be bypassed automatically) may leave the cluster in a middle state, e.g. region 915 * not assigned, or some hdfs files left behind. After getting rid of those stuck procedures, 916 * the operators may have to do some clean up on hdfs or schedule some assign procedures 917 * to let region online. DO AT YOUR OWN RISK. 918 * <p> 919 * A procedure can be bypassed only if 920 * 1. The procedure is in state of RUNNABLE, WAITING, WAITING_TIMEOUT 921 * or it is a root procedure without any child. 922 * 2. No other worker thread is executing it 923 * 3. No child procedure has been submitted 924 * 925 * <p> 926 * If all the requirements are meet, the procedure and its ancestors will be 927 * bypassed and persisted to WAL. 928 * 929 * <p> 930 * If the procedure is in WAITING state, will set it to RUNNABLE add it to run queue. 931 * TODO: What about WAITING_TIMEOUT? 932 * @param pids the procedure ids 933 * @param lockWait time to wait lock 934 * @param force if force set to true, we will bypass the procedure even if it is executing. 935 * This is for procedures which can't break out during executing(due to bug, mostly) 936 * In this case, bypassing the procedure is not enough, since it is already stuck 937 * there. We need to restart the master after bypassing, and letting the problematic 938 * procedure to execute wth bypass=true, so in that condition, the procedure can be 939 * successfully bypassed. 940 * @param recursive We will do an expensive search for children of each pid. EXPENSIVE! 941 * @return true if bypass success 942 * @throws IOException IOException 943 */ 944 public List<Boolean> bypassProcedure(List<Long> pids, long lockWait, boolean force, 945 boolean recursive) 946 throws IOException { 947 List<Boolean> result = new ArrayList<Boolean>(pids.size()); 948 for(long pid: pids) { 949 result.add(bypassProcedure(pid, lockWait, force, recursive)); 950 } 951 return result; 952 } 953 954 boolean bypassProcedure(long pid, long lockWait, boolean override, boolean recursive) 955 throws IOException { 956 Preconditions.checkArgument(lockWait > 0, "lockWait should be positive"); 957 final Procedure<TEnvironment> procedure = getProcedure(pid); 958 if (procedure == null) { 959 LOG.debug("Procedure pid={} does not exist, skipping bypass", pid); 960 return false; 961 } 962 963 LOG.info("Begin bypass {} with lockWait={}, override={}, recursive={}", 964 procedure, lockWait, override, recursive); 965 IdLock.Entry lockEntry = procExecutionLock.tryLockEntry(procedure.getProcId(), lockWait); 966 if (lockEntry == null && !override) { 967 LOG.info("Waited {} ms, but {} is still running, skipping bypass with force={}", 968 lockWait, procedure, override); 969 return false; 970 } else if (lockEntry == null) { 971 LOG.info("Waited {} ms, but {} is still running, begin bypass with force={}", 972 lockWait, procedure, override); 973 } 974 try { 975 // check whether the procedure is already finished 976 if (procedure.isFinished()) { 977 LOG.info("{} is already finished, skipping bypass", procedure); 978 return false; 979 } 980 981 if (procedure.hasChildren()) { 982 if (recursive) { 983 // EXPENSIVE. Checks each live procedure of which there could be many!!! 984 // Is there another way to get children of a procedure? 985 LOG.info("Recursive bypass on children of pid={}", procedure.getProcId()); 986 this.procedures.forEachValue(1 /*Single-threaded*/, 987 // Transformer 988 v -> { 989 return v.getParentProcId() == procedure.getProcId()? v: null; 990 }, 991 // Consumer 992 v -> { 993 boolean result = false; 994 IOException ioe = null; 995 try { 996 result = bypassProcedure(v.getProcId(), lockWait, override, recursive); 997 } catch (IOException e) { 998 LOG.warn("Recursive bypass of pid={}", v.getProcId(), e); 999 } 1000 }); 1001 } else { 1002 LOG.info("{} has children, skipping bypass", procedure); 1003 return false; 1004 } 1005 } 1006 1007 // If the procedure has no parent or no child, we are safe to bypass it in whatever state 1008 if (procedure.hasParent() && procedure.getState() != ProcedureState.RUNNABLE 1009 && procedure.getState() != ProcedureState.WAITING 1010 && procedure.getState() != ProcedureState.WAITING_TIMEOUT) { 1011 LOG.info("Bypassing procedures in RUNNABLE, WAITING and WAITING_TIMEOUT states " 1012 + "(with no parent), {}", 1013 procedure); 1014 // Question: how is the bypass done here? 1015 return false; 1016 } 1017 1018 // Now, the procedure is not finished, and no one can execute it since we take the lock now 1019 // And we can be sure that its ancestor is not running too, since their child has not 1020 // finished yet 1021 Procedure<TEnvironment> current = procedure; 1022 while (current != null) { 1023 LOG.info("Bypassing {}", current); 1024 current.bypass(getEnvironment()); 1025 store.update(procedure); 1026 long parentID = current.getParentProcId(); 1027 current = getProcedure(parentID); 1028 } 1029 1030 //wake up waiting procedure, already checked there is no child 1031 if (procedure.getState() == ProcedureState.WAITING) { 1032 procedure.setState(ProcedureState.RUNNABLE); 1033 store.update(procedure); 1034 } 1035 1036 // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. 1037 // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE 1038 if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1039 LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); 1040 if (timeoutExecutor.remove(procedure)) { 1041 LOG.debug("removed procedure {} from timeoutExecutor", procedure); 1042 timeoutExecutor.executeTimedoutProcedure(procedure); 1043 } 1044 } else if (lockEntry != null) { 1045 scheduler.addFront(procedure); 1046 LOG.info("Bypassing {} and its ancestors successfully, adding to queue", procedure); 1047 } else { 1048 // If we don't have the lock, we can't re-submit the queue, 1049 // since it is already executing. To get rid of the stuck situation, we 1050 // need to restart the master. With the procedure set to bypass, the procedureExecutor 1051 // will bypass it and won't get stuck again. 1052 LOG.info("Bypassing {} and its ancestors successfully, but since it is already running, " 1053 + "skipping add to queue", procedure); 1054 } 1055 return true; 1056 1057 } finally { 1058 if (lockEntry != null) { 1059 procExecutionLock.releaseLockEntry(lockEntry); 1060 } 1061 } 1062 } 1063 1064 /** 1065 * Add a new root-procedure to the executor. 1066 * @param proc the new procedure to execute. 1067 * @param nonceKey the registered unique identifier for this operation from the client or process. 1068 * @return the procedure id, that can be used to monitor the operation 1069 */ 1070 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", 1071 justification = "FindBugs is blind to the check-for-null") 1072 public long submitProcedure(Procedure<TEnvironment> proc, NonceKey nonceKey) { 1073 Preconditions.checkArgument(lastProcId.get() >= 0); 1074 1075 prepareProcedure(proc); 1076 1077 final Long currentProcId; 1078 if (nonceKey != null) { 1079 currentProcId = nonceKeysToProcIdsMap.get(nonceKey); 1080 Preconditions.checkArgument(currentProcId != null, 1081 "Expected nonceKey=" + nonceKey + " to be reserved, use registerNonce(); proc=" + proc); 1082 } else { 1083 currentProcId = nextProcId(); 1084 } 1085 1086 // Initialize the procedure 1087 proc.setNonceKey(nonceKey); 1088 proc.setProcId(currentProcId.longValue()); 1089 1090 // Commit the transaction 1091 store.insert(proc, null); 1092 LOG.debug("Stored {}", proc); 1093 1094 // Add the procedure to the executor 1095 return pushProcedure(proc); 1096 } 1097 1098 /** 1099 * Add a set of new root-procedure to the executor. 1100 * @param procs the new procedures to execute. 1101 */ 1102 // TODO: Do we need to take nonces here? 1103 public void submitProcedures(Procedure<TEnvironment>[] procs) { 1104 Preconditions.checkArgument(lastProcId.get() >= 0); 1105 if (procs == null || procs.length <= 0) { 1106 return; 1107 } 1108 1109 // Prepare procedure 1110 for (int i = 0; i < procs.length; ++i) { 1111 prepareProcedure(procs[i]).setProcId(nextProcId()); 1112 } 1113 1114 // Commit the transaction 1115 store.insert(procs); 1116 if (LOG.isDebugEnabled()) { 1117 LOG.debug("Stored " + Arrays.toString(procs)); 1118 } 1119 // Add the procedure to the executor 1120 for (Procedure<TEnvironment> proc : procs) { 1121 pushProcedure(proc); 1122 } 1123 } 1124 1125 private Procedure<TEnvironment> prepareProcedure(Procedure<TEnvironment> proc) { 1126 Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); 1127 Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); 1128 if (this.checkOwnerSet) { 1129 Preconditions.checkArgument(proc.hasOwner(), "missing owner"); 1130 } 1131 return proc; 1132 } 1133 1134 private long pushProcedure(Procedure<TEnvironment> proc) { 1135 long currentProcId = proc.getProcId(); 1136 // If we are going to upgrade to 2.2+, and this is not a sub procedure, do not push it to 1137 // scheduler. After we finish all the ongoing procedures, the master will quit. 1138 if (upgradeTo2_2 && !proc.hasParent()) { 1139 return currentProcId; 1140 } 1141 1142 // Update metrics on start of a procedure 1143 proc.updateMetricsOnSubmit(getEnvironment()); 1144 1145 // Create the rollback stack for the procedure 1146 RootProcedureState<TEnvironment> stack = new RootProcedureState<>(); 1147 rollbackStack.put(currentProcId, stack); 1148 1149 // Submit the new subprocedures 1150 assert !procedures.containsKey(currentProcId); 1151 procedures.put(currentProcId, proc); 1152 sendProcedureAddedNotification(currentProcId); 1153 scheduler.addBack(proc); 1154 return proc.getProcId(); 1155 } 1156 1157 /** 1158 * Send an abort notification the specified procedure. 1159 * Depending on the procedure implementation the abort can be considered or ignored. 1160 * @param procId the procedure to abort 1161 * @return true if the procedure exists and has received the abort, otherwise false. 1162 */ 1163 public boolean abort(long procId) { 1164 return abort(procId, true); 1165 } 1166 1167 /** 1168 * Send an abort notification to the specified procedure. 1169 * Depending on the procedure implementation, the abort can be considered or ignored. 1170 * @param procId the procedure to abort 1171 * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? 1172 * @return true if the procedure exists and has received the abort, otherwise false. 1173 */ 1174 public boolean abort(long procId, boolean mayInterruptIfRunning) { 1175 Procedure<TEnvironment> proc = procedures.get(procId); 1176 if (proc != null) { 1177 if (!mayInterruptIfRunning && proc.wasExecuted()) { 1178 return false; 1179 } 1180 return proc.abort(getEnvironment()); 1181 } 1182 return false; 1183 } 1184 1185 // ========================================================================== 1186 // Executor query helpers 1187 // ========================================================================== 1188 public Procedure<TEnvironment> getProcedure(final long procId) { 1189 return procedures.get(procId); 1190 } 1191 1192 public <T extends Procedure<TEnvironment>> T getProcedure(Class<T> clazz, long procId) { 1193 Procedure<TEnvironment> proc = getProcedure(procId); 1194 if (clazz.isInstance(proc)) { 1195 return clazz.cast(proc); 1196 } 1197 return null; 1198 } 1199 1200 public Procedure<TEnvironment> getResult(long procId) { 1201 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1202 if (retainer == null) { 1203 return null; 1204 } else { 1205 return retainer.getProcedure(); 1206 } 1207 } 1208 1209 /** 1210 * Return true if the procedure is finished. 1211 * The state may be "completed successfully" or "failed and rolledback". 1212 * Use getResult() to check the state or get the result data. 1213 * @param procId the ID of the procedure to check 1214 * @return true if the procedure execution is finished, otherwise false. 1215 */ 1216 public boolean isFinished(final long procId) { 1217 return !procedures.containsKey(procId); 1218 } 1219 1220 /** 1221 * Return true if the procedure is started. 1222 * @param procId the ID of the procedure to check 1223 * @return true if the procedure execution is started, otherwise false. 1224 */ 1225 public boolean isStarted(long procId) { 1226 Procedure<?> proc = procedures.get(procId); 1227 if (proc == null) { 1228 return completed.get(procId) != null; 1229 } 1230 return proc.wasExecuted(); 1231 } 1232 1233 /** 1234 * Mark the specified completed procedure, as ready to remove. 1235 * @param procId the ID of the procedure to remove 1236 */ 1237 public void removeResult(long procId) { 1238 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1239 if (retainer == null) { 1240 assert !procedures.containsKey(procId) : "pid=" + procId + " is still running"; 1241 LOG.debug("pid={} already removed by the cleaner.", procId); 1242 return; 1243 } 1244 1245 // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. 1246 retainer.setClientAckTime(EnvironmentEdgeManager.currentTime()); 1247 } 1248 1249 public Procedure<TEnvironment> getResultOrProcedure(long procId) { 1250 CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1251 if (retainer == null) { 1252 return procedures.get(procId); 1253 } else { 1254 return retainer.getProcedure(); 1255 } 1256 } 1257 1258 /** 1259 * Check if the user is this procedure's owner 1260 * @param procId the target procedure 1261 * @param user the user 1262 * @return true if the user is the owner of the procedure, 1263 * false otherwise or the owner is unknown. 1264 */ 1265 public boolean isProcedureOwner(long procId, User user) { 1266 if (user == null) { 1267 return false; 1268 } 1269 final Procedure<TEnvironment> runningProc = procedures.get(procId); 1270 if (runningProc != null) { 1271 return runningProc.getOwner().equals(user.getShortName()); 1272 } 1273 1274 final CompletedProcedureRetainer<TEnvironment> retainer = completed.get(procId); 1275 if (retainer != null) { 1276 return retainer.getProcedure().getOwner().equals(user.getShortName()); 1277 } 1278 1279 // Procedure either does not exist or has already completed and got cleaned up. 1280 // At this time, we cannot check the owner of the procedure 1281 return false; 1282 } 1283 1284 1285 /** 1286 * Should only be used when starting up, where the procedure workers have not been started. 1287 * <p/> 1288 * If the procedure works has been started, the return values maybe changed when you are 1289 * processing it so usually this is not safe. Use {@link #getProcedures()} below for most cases as 1290 * it will do a copy, and also include the finished procedures. 1291 */ 1292 public Collection<Procedure<TEnvironment>> getActiveProceduresNoCopy() { 1293 return procedures.values(); 1294 } 1295 1296 /** 1297 * Get procedures. 1298 * @return the procedures in a list 1299 */ 1300 public List<Procedure<TEnvironment>> getProcedures() { 1301 List<Procedure<TEnvironment>> procedureList = 1302 new ArrayList<>(procedures.size() + completed.size()); 1303 procedureList.addAll(procedures.values()); 1304 // Note: The procedure could show up twice in the list with different state, as 1305 // it could complete after we walk through procedures list and insert into 1306 // procedureList - it is ok, as we will use the information in the Procedure 1307 // to figure it out; to prevent this would increase the complexity of the logic. 1308 completed.values().stream().map(CompletedProcedureRetainer::getProcedure) 1309 .forEach(procedureList::add); 1310 return procedureList; 1311 } 1312 1313 // ========================================================================== 1314 // Listeners helpers 1315 // ========================================================================== 1316 public void registerListener(ProcedureExecutorListener listener) { 1317 this.listeners.add(listener); 1318 } 1319 1320 public boolean unregisterListener(ProcedureExecutorListener listener) { 1321 return this.listeners.remove(listener); 1322 } 1323 1324 private void sendProcedureLoadedNotification(final long procId) { 1325 if (!this.listeners.isEmpty()) { 1326 for (ProcedureExecutorListener listener: this.listeners) { 1327 try { 1328 listener.procedureLoaded(procId); 1329 } catch (Throwable e) { 1330 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1331 } 1332 } 1333 } 1334 } 1335 1336 private void sendProcedureAddedNotification(final long procId) { 1337 if (!this.listeners.isEmpty()) { 1338 for (ProcedureExecutorListener listener: this.listeners) { 1339 try { 1340 listener.procedureAdded(procId); 1341 } catch (Throwable e) { 1342 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1343 } 1344 } 1345 } 1346 } 1347 1348 private void sendProcedureFinishedNotification(final long procId) { 1349 if (!this.listeners.isEmpty()) { 1350 for (ProcedureExecutorListener listener: this.listeners) { 1351 try { 1352 listener.procedureFinished(procId); 1353 } catch (Throwable e) { 1354 LOG.error("Listener " + listener + " had an error: " + e.getMessage(), e); 1355 } 1356 } 1357 } 1358 } 1359 1360 // ========================================================================== 1361 // Procedure IDs helpers 1362 // ========================================================================== 1363 private long nextProcId() { 1364 long procId = lastProcId.incrementAndGet(); 1365 if (procId < 0) { 1366 while (!lastProcId.compareAndSet(procId, 0)) { 1367 procId = lastProcId.get(); 1368 if (procId >= 0) 1369 break; 1370 } 1371 while (procedures.containsKey(procId)) { 1372 procId = lastProcId.incrementAndGet(); 1373 } 1374 } 1375 assert procId >= 0 : "Invalid procId " + procId; 1376 return procId; 1377 } 1378 1379 @VisibleForTesting 1380 protected long getLastProcId() { 1381 return lastProcId.get(); 1382 } 1383 1384 @VisibleForTesting 1385 public Set<Long> getActiveProcIds() { 1386 return procedures.keySet(); 1387 } 1388 1389 Long getRootProcedureId(Procedure<TEnvironment> proc) { 1390 return Procedure.getRootProcedureId(procedures, proc); 1391 } 1392 1393 // ========================================================================== 1394 // Executions 1395 // ========================================================================== 1396 private void executeProcedure(Procedure<TEnvironment> proc) { 1397 if (proc.isFinished()) { 1398 LOG.debug("{} is already finished, skipping execution", proc); 1399 return; 1400 } 1401 final Long rootProcId = getRootProcedureId(proc); 1402 if (rootProcId == null) { 1403 // The 'proc' was ready to run but the root procedure was rolledback 1404 LOG.warn("Rollback because parent is done/rolledback proc=" + proc); 1405 executeRollback(proc); 1406 return; 1407 } 1408 1409 RootProcedureState<TEnvironment> procStack = rollbackStack.get(rootProcId); 1410 if (procStack == null) { 1411 LOG.warn("RootProcedureState is null for " + proc.getProcId()); 1412 return; 1413 } 1414 do { 1415 // Try to acquire the execution 1416 if (!procStack.acquire(proc)) { 1417 if (procStack.setRollback()) { 1418 // we have the 'rollback-lock' we can start rollingback 1419 switch (executeRollback(rootProcId, procStack)) { 1420 case LOCK_ACQUIRED: 1421 break; 1422 case LOCK_YIELD_WAIT: 1423 procStack.unsetRollback(); 1424 scheduler.yield(proc); 1425 break; 1426 case LOCK_EVENT_WAIT: 1427 LOG.info("LOCK_EVENT_WAIT rollback..." + proc); 1428 procStack.unsetRollback(); 1429 break; 1430 default: 1431 throw new UnsupportedOperationException(); 1432 } 1433 } else { 1434 // if we can't rollback means that some child is still running. 1435 // the rollback will be executed after all the children are done. 1436 // If the procedure was never executed, remove and mark it as rolledback. 1437 if (!proc.wasExecuted()) { 1438 switch (executeRollback(proc)) { 1439 case LOCK_ACQUIRED: 1440 break; 1441 case LOCK_YIELD_WAIT: 1442 scheduler.yield(proc); 1443 break; 1444 case LOCK_EVENT_WAIT: 1445 LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); 1446 break; 1447 default: 1448 throw new UnsupportedOperationException(); 1449 } 1450 } 1451 } 1452 break; 1453 } 1454 1455 // Execute the procedure 1456 assert proc.getState() == ProcedureState.RUNNABLE : proc; 1457 // Note that lock is NOT about concurrency but rather about ensuring 1458 // ownership of a procedure of an entity such as a region or table 1459 LockState lockState = acquireLock(proc); 1460 switch (lockState) { 1461 case LOCK_ACQUIRED: 1462 execProcedure(procStack, proc); 1463 break; 1464 case LOCK_YIELD_WAIT: 1465 LOG.info(lockState + " " + proc); 1466 scheduler.yield(proc); 1467 break; 1468 case LOCK_EVENT_WAIT: 1469 // Someone will wake us up when the lock is available 1470 LOG.debug(lockState + " " + proc); 1471 break; 1472 default: 1473 throw new UnsupportedOperationException(); 1474 } 1475 procStack.release(proc); 1476 1477 if (proc.isSuccess()) { 1478 // update metrics on finishing the procedure. 1479 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); 1480 // Print out count of outstanding siblings if this procedure has a parent. 1481 Procedure<TEnvironment> parent = null; 1482 if (proc.hasParent()) { 1483 parent = procedures.get(proc.getParentProcId()); 1484 } 1485 LOG.info("Finished {} in {}{}", 1486 proc, 1487 StringUtils.humanTimeDiff(proc.elapsedTime()), 1488 parent != null? (", unfinishedSiblingCount=" + parent.getChildrenLatch()): ""); 1489 // Finalize the procedure state 1490 if (proc.getProcId() == rootProcId) { 1491 procedureFinished(proc); 1492 } else { 1493 execCompletionCleanup(proc); 1494 } 1495 break; 1496 } 1497 } while (procStack.isFailed()); 1498 } 1499 1500 private LockState acquireLock(Procedure<TEnvironment> proc) { 1501 TEnvironment env = getEnvironment(); 1502 // if holdLock is true, then maybe we already have the lock, so just return LOCK_ACQUIRED if 1503 // hasLock is true. 1504 if (proc.hasLock()) { 1505 return LockState.LOCK_ACQUIRED; 1506 } 1507 return proc.doAcquireLock(env, store); 1508 } 1509 1510 private void releaseLock(Procedure<TEnvironment> proc, boolean force) { 1511 TEnvironment env = getEnvironment(); 1512 // For how the framework works, we know that we will always have the lock 1513 // when we call releaseLock(), so we can avoid calling proc.hasLock() 1514 if (force || !proc.holdLock(env) || proc.isFinished()) { 1515 proc.doReleaseLock(env, store); 1516 } 1517 } 1518 1519 /** 1520 * Execute the rollback of the full procedure stack. Once the procedure is rolledback, the 1521 * root-procedure will be visible as finished to user, and the result will be the fatal exception. 1522 */ 1523 private LockState executeRollback(long rootProcId, RootProcedureState<TEnvironment> procStack) { 1524 Procedure<TEnvironment> rootProc = procedures.get(rootProcId); 1525 RemoteProcedureException exception = rootProc.getException(); 1526 // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are 1527 // rolling back because the subprocedure does. Clarify. 1528 if (exception == null) { 1529 exception = procStack.getException(); 1530 rootProc.setFailure(exception); 1531 store.update(rootProc); 1532 } 1533 1534 List<Procedure<TEnvironment>> subprocStack = procStack.getSubproceduresStack(); 1535 assert subprocStack != null : "Called rollback with no steps executed rootProc=" + rootProc; 1536 1537 int stackTail = subprocStack.size(); 1538 while (stackTail-- > 0) { 1539 Procedure<TEnvironment> proc = subprocStack.get(stackTail); 1540 IdLock.Entry lockEntry = null; 1541 // Hold the execution lock if it is not held by us. The IdLock is not reentrant so we need 1542 // this check, as the worker will hold the lock before executing a procedure. This is the only 1543 // place where we may hold two procedure execution locks, and there is a fence in the 1544 // RootProcedureState where we can make sure that only one worker can execute the rollback of 1545 // a RootProcedureState, so there is no dead lock problem. And the lock here is necessary to 1546 // prevent race between us and the force update thread. 1547 if (!procExecutionLock.isHeldByCurrentThread(proc.getProcId())) { 1548 try { 1549 lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 1550 } catch (IOException e) { 1551 // can only happen if interrupted, so not a big deal to propagate it 1552 throw new UncheckedIOException(e); 1553 } 1554 } 1555 try { 1556 // For the sub procedures which are successfully finished, we do not rollback them. 1557 // Typically, if we want to rollback a procedure, we first need to rollback it, and then 1558 // recursively rollback its ancestors. The state changes which are done by sub procedures 1559 // should be handled by parent procedures when rolling back. For example, when rolling back 1560 // a MergeTableProcedure, we will schedule new procedures to bring the offline regions 1561 // online, instead of rolling back the original procedures which offlined the regions(in 1562 // fact these procedures can not be rolled back...). 1563 if (proc.isSuccess()) { 1564 // Just do the cleanup work, without actually executing the rollback 1565 subprocStack.remove(stackTail); 1566 cleanupAfterRollbackOneStep(proc); 1567 continue; 1568 } 1569 LockState lockState = acquireLock(proc); 1570 if (lockState != LockState.LOCK_ACQUIRED) { 1571 // can't take a lock on the procedure, add the root-proc back on the 1572 // queue waiting for the lock availability 1573 return lockState; 1574 } 1575 1576 lockState = executeRollback(proc); 1577 releaseLock(proc, false); 1578 boolean abortRollback = lockState != LockState.LOCK_ACQUIRED; 1579 abortRollback |= !isRunning() || !store.isRunning(); 1580 1581 // allows to kill the executor before something is stored to the wal. 1582 // useful to test the procedure recovery. 1583 if (abortRollback) { 1584 return lockState; 1585 } 1586 1587 subprocStack.remove(stackTail); 1588 1589 // if the procedure is kind enough to pass the slot to someone else, yield 1590 // if the proc is already finished, do not yield 1591 if (!proc.isFinished() && proc.isYieldAfterExecutionStep(getEnvironment())) { 1592 return LockState.LOCK_YIELD_WAIT; 1593 } 1594 1595 if (proc != rootProc) { 1596 execCompletionCleanup(proc); 1597 } 1598 } finally { 1599 if (lockEntry != null) { 1600 procExecutionLock.releaseLockEntry(lockEntry); 1601 } 1602 } 1603 } 1604 1605 // Finalize the procedure state 1606 LOG.info("Rolled back {} exec-time={}", rootProc, 1607 StringUtils.humanTimeDiff(rootProc.elapsedTime())); 1608 procedureFinished(rootProc); 1609 return LockState.LOCK_ACQUIRED; 1610 } 1611 1612 private void cleanupAfterRollbackOneStep(Procedure<TEnvironment> proc) { 1613 if (proc.removeStackIndex()) { 1614 if (!proc.isSuccess()) { 1615 proc.setState(ProcedureState.ROLLEDBACK); 1616 } 1617 1618 // update metrics on finishing the procedure (fail) 1619 proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), false); 1620 1621 if (proc.hasParent()) { 1622 store.delete(proc.getProcId()); 1623 procedures.remove(proc.getProcId()); 1624 } else { 1625 final long[] childProcIds = rollbackStack.get(proc.getProcId()).getSubprocedureIds(); 1626 if (childProcIds != null) { 1627 store.delete(proc, childProcIds); 1628 } else { 1629 store.update(proc); 1630 } 1631 } 1632 } else { 1633 store.update(proc); 1634 } 1635 } 1636 1637 /** 1638 * Execute the rollback of the procedure step. 1639 * It updates the store with the new state (stack index) 1640 * or will remove completly the procedure in case it is a child. 1641 */ 1642 private LockState executeRollback(Procedure<TEnvironment> proc) { 1643 try { 1644 proc.doRollback(getEnvironment()); 1645 } catch (IOException e) { 1646 LOG.debug("Roll back attempt failed for {}", proc, e); 1647 return LockState.LOCK_YIELD_WAIT; 1648 } catch (InterruptedException e) { 1649 handleInterruptedException(proc, e); 1650 return LockState.LOCK_YIELD_WAIT; 1651 } catch (Throwable e) { 1652 // Catch NullPointerExceptions or similar errors... 1653 LOG.error(HBaseMarkers.FATAL, "CODE-BUG: Uncaught runtime exception for " + proc, e); 1654 } 1655 1656 // allows to kill the executor before something is stored to the wal. 1657 // useful to test the procedure recovery. 1658 if (testing != null && testing.shouldKillBeforeStoreUpdate()) { 1659 String msg = "TESTING: Kill before store update"; 1660 LOG.debug(msg); 1661 stop(); 1662 throw new RuntimeException(msg); 1663 } 1664 1665 cleanupAfterRollbackOneStep(proc); 1666 1667 return LockState.LOCK_ACQUIRED; 1668 } 1669 1670 private void yieldProcedure(Procedure<TEnvironment> proc) { 1671 releaseLock(proc, false); 1672 scheduler.yield(proc); 1673 } 1674 1675 /** 1676 * Executes <code>procedure</code> 1677 * <ul> 1678 * <li>Calls the doExecute() of the procedure 1679 * <li>If the procedure execution didn't fail (i.e. valid user input) 1680 * <ul> 1681 * <li>...and returned subprocedures 1682 * <ul><li>The subprocedures are initialized. 1683 * <li>The subprocedures are added to the store 1684 * <li>The subprocedures are added to the runnable queue 1685 * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete 1686 * </ul> 1687 * </li> 1688 * <li>...if there are no subprocedure 1689 * <ul><li>the procedure completed successfully 1690 * <li>if there is a parent (WAITING) 1691 * <li>the parent state will be set to RUNNABLE 1692 * </ul> 1693 * </li> 1694 * </ul> 1695 * </li> 1696 * <li>In case of failure 1697 * <ul> 1698 * <li>The store is updated with the new state</li> 1699 * <li>The executor (caller of this method) will start the rollback of the procedure</li> 1700 * </ul> 1701 * </li> 1702 * </ul> 1703 */ 1704 private void execProcedure(RootProcedureState<TEnvironment> procStack, 1705 Procedure<TEnvironment> procedure) { 1706 Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE, 1707 "NOT RUNNABLE! " + procedure.toString()); 1708 1709 // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. 1710 // The exception is caught below and then we hurry to the exit without disturbing state. The 1711 // idea is that the processing of this procedure will be unsuspended later by an external event 1712 // such the report of a region open. 1713 boolean suspended = false; 1714 1715 // Whether to 're-' -execute; run through the loop again. 1716 boolean reExecute = false; 1717 1718 Procedure<TEnvironment>[] subprocs = null; 1719 do { 1720 reExecute = false; 1721 procedure.resetPersistence(); 1722 try { 1723 subprocs = procedure.doExecute(getEnvironment()); 1724 if (subprocs != null && subprocs.length == 0) { 1725 subprocs = null; 1726 } 1727 } catch (ProcedureSuspendedException e) { 1728 LOG.trace("Suspend {}", procedure); 1729 suspended = true; 1730 } catch (ProcedureYieldException e) { 1731 LOG.trace("Yield {}", procedure, e); 1732 yieldProcedure(procedure); 1733 return; 1734 } catch (InterruptedException e) { 1735 LOG.trace("Yield interrupt {}", procedure, e); 1736 handleInterruptedException(procedure, e); 1737 yieldProcedure(procedure); 1738 return; 1739 } catch (Throwable e) { 1740 // Catch NullPointerExceptions or similar errors... 1741 String msg = "CODE-BUG: Uncaught runtime exception: " + procedure; 1742 LOG.error(msg, e); 1743 procedure.setFailure(new RemoteProcedureException(msg, e)); 1744 } 1745 1746 if (!procedure.isFailed()) { 1747 if (subprocs != null) { 1748 if (subprocs.length == 1 && subprocs[0] == procedure) { 1749 // Procedure returned itself. Quick-shortcut for a state machine-like procedure; 1750 // i.e. we go around this loop again rather than go back out on the scheduler queue. 1751 subprocs = null; 1752 reExecute = true; 1753 LOG.trace("Short-circuit to next step on pid={}", procedure.getProcId()); 1754 } else { 1755 // Yield the current procedure, and make the subprocedure runnable 1756 // subprocs may come back 'null'. 1757 subprocs = initializeChildren(procStack, procedure, subprocs); 1758 LOG.info("Initialized subprocedures=" + 1759 (subprocs == null? null: 1760 Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). 1761 collect(Collectors.toList()).toString())); 1762 } 1763 } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { 1764 LOG.trace("Added to timeoutExecutor {}", procedure); 1765 timeoutExecutor.add(procedure); 1766 } else if (!suspended) { 1767 // No subtask, so we are done 1768 procedure.setState(ProcedureState.SUCCESS); 1769 } 1770 } 1771 1772 // Add the procedure to the stack 1773 procStack.addRollbackStep(procedure); 1774 1775 // allows to kill the executor before something is stored to the wal. 1776 // useful to test the procedure recovery. 1777 if (testing != null && testing.shouldKillBeforeStoreUpdate(suspended)) { 1778 kill("TESTING: Kill BEFORE store update: " + procedure); 1779 } 1780 1781 // TODO: The code here doesn't check if store is running before persisting to the store as 1782 // it relies on the method call below to throw RuntimeException to wind up the stack and 1783 // executor thread to stop. The statement following the method call below seems to check if 1784 // store is not running, to prevent scheduling children procedures, re-execution or yield 1785 // of this procedure. This may need more scrutiny and subsequent cleanup in future 1786 // 1787 // Commit the transaction even if a suspend (state may have changed). Note this append 1788 // can take a bunch of time to complete. 1789 if (procedure.needPersistence()) { 1790 updateStoreOnExec(procStack, procedure, subprocs); 1791 } 1792 1793 // if the store is not running we are aborting 1794 if (!store.isRunning()) { 1795 return; 1796 } 1797 // if the procedure is kind enough to pass the slot to someone else, yield 1798 if (procedure.isRunnable() && !suspended && 1799 procedure.isYieldAfterExecutionStep(getEnvironment())) { 1800 yieldProcedure(procedure); 1801 return; 1802 } 1803 1804 assert (reExecute && subprocs == null) || !reExecute; 1805 } while (reExecute); 1806 1807 // Allows to kill the executor after something is stored to the WAL but before the below 1808 // state settings are done -- in particular the one on the end where we make parent 1809 // RUNNABLE again when its children are done; see countDownChildren. 1810 if (testing != null && testing.shouldKillAfterStoreUpdate(suspended)) { 1811 kill("TESTING: Kill AFTER store update: " + procedure); 1812 } 1813 1814 // Submit the new subprocedures 1815 if (subprocs != null && !procedure.isFailed()) { 1816 submitChildrenProcedures(subprocs); 1817 } 1818 1819 // we need to log the release lock operation before waking up the parent procedure, as there 1820 // could be race that the parent procedure may call updateStoreOnExec ahead of us and remove all 1821 // the sub procedures from store and cause problems... 1822 releaseLock(procedure, false); 1823 1824 // if the procedure is complete and has a parent, count down the children latch. 1825 // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. 1826 if (!suspended && procedure.isFinished() && procedure.hasParent()) { 1827 countDownChildren(procStack, procedure); 1828 } 1829 } 1830 1831 private void kill(String msg) { 1832 LOG.debug(msg); 1833 stop(); 1834 throw new RuntimeException(msg); 1835 } 1836 1837 private Procedure<TEnvironment>[] initializeChildren(RootProcedureState<TEnvironment> procStack, 1838 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1839 assert subprocs != null : "expected subprocedures"; 1840 final long rootProcId = getRootProcedureId(procedure); 1841 for (int i = 0; i < subprocs.length; ++i) { 1842 Procedure<TEnvironment> subproc = subprocs[i]; 1843 if (subproc == null) { 1844 String msg = "subproc[" + i + "] is null, aborting the procedure"; 1845 procedure.setFailure(new RemoteProcedureException(msg, 1846 new IllegalArgumentIOException(msg))); 1847 return null; 1848 } 1849 1850 assert subproc.getState() == ProcedureState.INITIALIZING : subproc; 1851 subproc.setParentProcId(procedure.getProcId()); 1852 subproc.setRootProcId(rootProcId); 1853 subproc.setProcId(nextProcId()); 1854 procStack.addSubProcedure(subproc); 1855 } 1856 1857 if (!procedure.isFailed()) { 1858 procedure.setChildrenLatch(subprocs.length); 1859 switch (procedure.getState()) { 1860 case RUNNABLE: 1861 procedure.setState(ProcedureState.WAITING); 1862 break; 1863 case WAITING_TIMEOUT: 1864 timeoutExecutor.add(procedure); 1865 break; 1866 default: 1867 break; 1868 } 1869 } 1870 return subprocs; 1871 } 1872 1873 private void submitChildrenProcedures(Procedure<TEnvironment>[] subprocs) { 1874 for (int i = 0; i < subprocs.length; ++i) { 1875 Procedure<TEnvironment> subproc = subprocs[i]; 1876 subproc.updateMetricsOnSubmit(getEnvironment()); 1877 assert !procedures.containsKey(subproc.getProcId()); 1878 procedures.put(subproc.getProcId(), subproc); 1879 scheduler.addFront(subproc); 1880 } 1881 } 1882 1883 private void countDownChildren(RootProcedureState<TEnvironment> procStack, 1884 Procedure<TEnvironment> procedure) { 1885 Procedure<TEnvironment> parent = procedures.get(procedure.getParentProcId()); 1886 if (parent == null) { 1887 assert procStack.isRollingback(); 1888 return; 1889 } 1890 1891 // If this procedure is the last child awake the parent procedure 1892 if (parent.tryRunnable()) { 1893 // If we succeeded in making the parent runnable -- i.e. all of its 1894 // children have completed, move parent to front of the queue. 1895 store.update(parent); 1896 scheduler.addFront(parent); 1897 LOG.info("Finished subprocedure pid={}, resume processing parent {}", 1898 procedure.getProcId(), parent); 1899 return; 1900 } 1901 } 1902 1903 private void updateStoreOnExec(RootProcedureState<TEnvironment> procStack, 1904 Procedure<TEnvironment> procedure, Procedure<TEnvironment>[] subprocs) { 1905 if (subprocs != null && !procedure.isFailed()) { 1906 if (LOG.isTraceEnabled()) { 1907 LOG.trace("Stored " + procedure + ", children " + Arrays.toString(subprocs)); 1908 } 1909 store.insert(procedure, subprocs); 1910 } else { 1911 LOG.trace("Store update {}", procedure); 1912 if (procedure.isFinished() && !procedure.hasParent()) { 1913 // remove child procedures 1914 final long[] childProcIds = procStack.getSubprocedureIds(); 1915 if (childProcIds != null) { 1916 store.delete(procedure, childProcIds); 1917 for (int i = 0; i < childProcIds.length; ++i) { 1918 procedures.remove(childProcIds[i]); 1919 } 1920 } else { 1921 store.update(procedure); 1922 } 1923 } else { 1924 store.update(procedure); 1925 } 1926 } 1927 } 1928 1929 private void handleInterruptedException(Procedure<TEnvironment> proc, InterruptedException e) { 1930 LOG.trace("Interrupt during {}. suspend and retry it later.", proc, e); 1931 // NOTE: We don't call Thread.currentThread().interrupt() 1932 // because otherwise all the subsequent calls e.g. Thread.sleep() will throw 1933 // the InterruptedException. If the master is going down, we will be notified 1934 // and the executor/store will be stopped. 1935 // (The interrupted procedure will be retried on the next run) 1936 } 1937 1938 private void execCompletionCleanup(Procedure<TEnvironment> proc) { 1939 final TEnvironment env = getEnvironment(); 1940 if (proc.hasLock()) { 1941 LOG.warn("Usually this should not happen, we will release the lock before if the procedure" + 1942 " is finished, even if the holdLock is true, arrive here means we have some holes where" + 1943 " we do not release the lock. And the releaseLock below may fail since the procedure may" + 1944 " have already been deleted from the procedure store."); 1945 releaseLock(proc, true); 1946 } 1947 try { 1948 proc.completionCleanup(env); 1949 } catch (Throwable e) { 1950 // Catch NullPointerExceptions or similar errors... 1951 LOG.error("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); 1952 } 1953 } 1954 1955 private void procedureFinished(Procedure<TEnvironment> proc) { 1956 // call the procedure completion cleanup handler 1957 execCompletionCleanup(proc); 1958 1959 CompletedProcedureRetainer<TEnvironment> retainer = new CompletedProcedureRetainer<>(proc); 1960 1961 // update the executor internal state maps 1962 if (!proc.shouldWaitClientAck(getEnvironment())) { 1963 retainer.setClientAckTime(0); 1964 } 1965 1966 completed.put(proc.getProcId(), retainer); 1967 rollbackStack.remove(proc.getProcId()); 1968 procedures.remove(proc.getProcId()); 1969 1970 // call the runnableSet completion cleanup handler 1971 try { 1972 scheduler.completionCleanup(proc); 1973 } catch (Throwable e) { 1974 // Catch NullPointerExceptions or similar errors... 1975 LOG.error("CODE-BUG: uncatched runtime exception for completion cleanup: {}", proc, e); 1976 } 1977 1978 // Notify the listeners 1979 sendProcedureFinishedNotification(proc.getProcId()); 1980 } 1981 1982 RootProcedureState<TEnvironment> getProcStack(long rootProcId) { 1983 return rollbackStack.get(rootProcId); 1984 } 1985 1986 @VisibleForTesting 1987 ProcedureScheduler getProcedureScheduler() { 1988 return scheduler; 1989 } 1990 1991 @VisibleForTesting 1992 int getCompletedSize() { 1993 return completed.size(); 1994 } 1995 1996 // ========================================================================== 1997 // Worker Thread 1998 // ========================================================================== 1999 private class WorkerThread extends StoppableThread { 2000 private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); 2001 private volatile Procedure<TEnvironment> activeProcedure; 2002 private boolean onlyPollUrgent = false; 2003 public WorkerThread(ThreadGroup group) { 2004 this(group, "PEWorker-"); 2005 } 2006 2007 protected WorkerThread(ThreadGroup group, String prefix) { 2008 this(group, prefix, false); 2009 } 2010 2011 protected WorkerThread(ThreadGroup group, String prefix, boolean onlyPollUrgent) { 2012 super(group, prefix + workerId.incrementAndGet()); 2013 this.onlyPollUrgent = onlyPollUrgent; 2014 setDaemon(true); 2015 } 2016 2017 @Override 2018 public void sendStopSignal() { 2019 scheduler.signalAll(); 2020 } 2021 @Override 2022 public void run() { 2023 long lastUpdate = EnvironmentEdgeManager.currentTime(); 2024 try { 2025 while (isRunning() && keepAlive(lastUpdate)) { 2026 Procedure<TEnvironment> proc = scheduler 2027 .poll(onlyPollUrgent, keepAliveTime, TimeUnit.MILLISECONDS); 2028 if (proc == null) { 2029 continue; 2030 } 2031 this.activeProcedure = proc; 2032 int activeCount = activeExecutorCount.incrementAndGet(); 2033 int runningCount = store.setRunningProcedureCount(activeCount); 2034 LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(), 2035 runningCount, activeCount); 2036 executionStartTime.set(EnvironmentEdgeManager.currentTime()); 2037 IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId()); 2038 try { 2039 executeProcedure(proc); 2040 } catch (AssertionError e) { 2041 LOG.info("ASSERT pid=" + proc.getProcId(), e); 2042 throw e; 2043 } finally { 2044 procExecutionLock.releaseLockEntry(lockEntry); 2045 activeCount = activeExecutorCount.decrementAndGet(); 2046 runningCount = store.setRunningProcedureCount(activeCount); 2047 LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(), 2048 runningCount, activeCount); 2049 this.activeProcedure = null; 2050 lastUpdate = EnvironmentEdgeManager.currentTime(); 2051 executionStartTime.set(Long.MAX_VALUE); 2052 } 2053 } 2054 } catch (Throwable t) { 2055 LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t); 2056 } finally { 2057 LOG.trace("Worker terminated."); 2058 } 2059 if (onlyPollUrgent) { 2060 urgentWorkerThreads.remove(this); 2061 } else { 2062 workerThreads.remove(this); 2063 } 2064 } 2065 2066 @Override 2067 public String toString() { 2068 Procedure<?> p = this.activeProcedure; 2069 return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); 2070 } 2071 2072 /** 2073 * @return the time since the current procedure is running 2074 */ 2075 public long getCurrentRunTime() { 2076 return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); 2077 } 2078 2079 // core worker never timeout 2080 protected boolean keepAlive(long lastUpdate) { 2081 return true; 2082 } 2083 } 2084 2085 // A worker thread which can be added when core workers are stuck. Will timeout after 2086 // keepAliveTime if there is no procedure to run. 2087 private final class KeepAliveWorkerThread extends WorkerThread { 2088 2089 public KeepAliveWorkerThread(ThreadGroup group) { 2090 super(group, "KeepAlivePEWorker-"); 2091 } 2092 2093 @Override 2094 protected boolean keepAlive(long lastUpdate) { 2095 return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime; 2096 } 2097 } 2098 2099 // ---------------------------------------------------------------------------- 2100 // TODO-MAYBE: Should we provide a InlineChore to notify the store with the 2101 // full set of procedures pending and completed to write a compacted 2102 // version of the log (in case is a log)? 2103 // In theory no, procedures are have a short life, so at some point the store 2104 // will have the tracker saying everything is in the last log. 2105 // ---------------------------------------------------------------------------- 2106 2107 private final class WorkerMonitor extends InlineChore { 2108 public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = 2109 "hbase.procedure.worker.monitor.interval.msec"; 2110 private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec 2111 2112 public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = 2113 "hbase.procedure.worker.stuck.threshold.msec"; 2114 private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec 2115 2116 public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = 2117 "hbase.procedure.worker.add.stuck.percentage"; 2118 private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck 2119 2120 private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; 2121 private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; 2122 private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; 2123 2124 public WorkerMonitor() { 2125 refreshConfig(); 2126 } 2127 2128 @Override 2129 public void run() { 2130 final int stuckCount = checkForStuckWorkers(); 2131 checkThreadCount(stuckCount); 2132 2133 // refresh interval (poor man dynamic conf update) 2134 refreshConfig(); 2135 } 2136 2137 private int checkForStuckWorkers() { 2138 // check if any of the worker is stuck 2139 int stuckCount = 0; 2140 for (WorkerThread worker : workerThreads) { 2141 if (worker.getCurrentRunTime() < stuckThreshold) { 2142 continue; 2143 } 2144 2145 // WARN the worker is stuck 2146 stuckCount++; 2147 LOG.warn("Worker stuck {}, run time {}", worker, 2148 StringUtils.humanTimeDiff(worker.getCurrentRunTime())); 2149 } 2150 return stuckCount; 2151 } 2152 2153 private void checkThreadCount(final int stuckCount) { 2154 // nothing to do if there are no runnable tasks 2155 if (stuckCount < 1 || !scheduler.hasRunnables()) { 2156 return; 2157 } 2158 2159 // add a new thread if the worker stuck percentage exceed the threshold limit 2160 // and every handler is active. 2161 final float stuckPerc = ((float) stuckCount) / workerThreads.size(); 2162 // let's add new worker thread more aggressively, as they will timeout finally if there is no 2163 // work to do. 2164 if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < maxPoolSize) { 2165 final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); 2166 workerThreads.add(worker); 2167 worker.start(); 2168 LOG.debug("Added new worker thread {}", worker); 2169 } 2170 } 2171 2172 private void refreshConfig() { 2173 addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, 2174 DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); 2175 timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, 2176 DEFAULT_WORKER_MONITOR_INTERVAL); 2177 stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, 2178 DEFAULT_WORKER_STUCK_THRESHOLD); 2179 } 2180 2181 @Override 2182 public int getTimeoutInterval() { 2183 return timeoutInterval; 2184 } 2185 } 2186}