001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master; 019 020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; 021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; 022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; 023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE; 024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS; 025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS; 026 027import java.io.IOException; 028import java.util.ArrayList; 029import java.util.Arrays; 030import java.util.Collections; 031import java.util.HashSet; 032import java.util.List; 033import java.util.Map; 034import java.util.Set; 035import java.util.concurrent.ConcurrentHashMap; 036import java.util.concurrent.ConcurrentMap; 037import java.util.concurrent.atomic.AtomicInteger; 038 039import org.apache.hadoop.conf.Configuration; 040import org.apache.hadoop.fs.FileStatus; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.fs.PathFilter; 044import org.apache.hadoop.hbase.ChoreService; 045import org.apache.hadoop.hbase.ScheduledChore; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.SplitLogCounters; 048import org.apache.hadoop.hbase.Stoppable; 049import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; 050import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; 051import org.apache.hadoop.hbase.log.HBaseMarkers; 052import org.apache.hadoop.hbase.monitoring.MonitoredTask; 053import org.apache.hadoop.hbase.monitoring.TaskMonitor; 054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 055import org.apache.hadoop.hbase.util.FSUtils; 056import org.apache.hadoop.hbase.util.HasThread; 057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; 058import org.apache.yetus.audience.InterfaceAudience; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 062 063/** 064 * Distributes the task of log splitting to the available region servers. 065 * Coordination happens via coordination engine. For every log file that has to be split a 066 * task is created. SplitLogWorkers race to grab a task. 067 * 068 * <p>SplitLogManager monitors the tasks that it creates using the 069 * timeoutMonitor thread. If a task's progress is slow then 070 * {@link SplitLogManagerCoordination#checkTasks} will take away the 071 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} 072 * and the task will be up for grabs again. When the task is done then it is 073 * deleted by SplitLogManager. 074 * 075 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's 076 * log files. The caller thread waits in this method until all the log files 077 * have been split. 078 * 079 * <p>All the coordination calls made by this class are asynchronous. This is mainly 080 * to help reduce response time seen by the callers. 081 * 082 * <p>There is race in this design between the SplitLogManager and the 083 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality 084 * already been completed by a SplitLogWorker. We rely on the idempotency of 085 * the log splitting task for correctness. 086 * 087 * <p>It is also assumed that every log splitting task is unique and once 088 * completed (either with success or with error) it will be not be submitted 089 * again. If a task is resubmitted then there is a risk that old "delete task" 090 * can delete the re-submission. 091 */ 092@InterfaceAudience.Private 093public class SplitLogManager { 094 private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class); 095 096 private final MasterServices server; 097 098 private final Configuration conf; 099 private final ChoreService choreService; 100 101 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min 102 103 private long unassignedTimeout; 104 private long lastTaskCreateTime = Long.MAX_VALUE; 105 106 @VisibleForTesting 107 final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>(); 108 private TimeoutMonitor timeoutMonitor; 109 110 private volatile Set<ServerName> deadWorkers = null; 111 private final Object deadWorkersLock = new Object(); 112 113 /** 114 * Its OK to construct this object even when region-servers are not online. It does lookup the 115 * orphan tasks in coordination engine but it doesn't block waiting for them to be done. 116 * @param master the master services 117 * @param conf the HBase configuration 118 * @throws IOException 119 */ 120 public SplitLogManager(MasterServices master, Configuration conf) 121 throws IOException { 122 this.server = master; 123 this.conf = conf; 124 // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread. 125 // For example, in tests. 126 String name = master instanceof HasThread? ((HasThread)master).getName(): 127 master.getServerName().toShortString(); 128 this.choreService = 129 new ChoreService(name + ".splitLogManager."); 130 if (server.getCoordinatedStateManager() != null) { 131 SplitLogManagerCoordination coordination = getSplitLogManagerCoordination(); 132 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>()); 133 SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions); 134 coordination.setDetails(details); 135 coordination.init(); 136 } 137 this.unassignedTimeout = 138 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); 139 this.timeoutMonitor = 140 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), 141 master); 142 choreService.scheduleChore(timeoutMonitor); 143 } 144 145 private SplitLogManagerCoordination getSplitLogManagerCoordination() { 146 return server.getCoordinatedStateManager().getSplitLogManagerCoordination(); 147 } 148 149 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException { 150 return getFileList(conf, logDirs, filter); 151 } 152 153 /** 154 * Get a list of paths that need to be split given a set of server-specific directories and 155 * optionally a filter. 156 * 157 * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory 158 * layout. 159 * 160 * Should be package-private, but is needed by 161 * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, 162 * Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests. 163 */ 164 @VisibleForTesting 165 public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs, 166 final PathFilter filter) 167 throws IOException { 168 List<FileStatus> fileStatus = new ArrayList<>(); 169 for (Path logDir : logDirs) { 170 final FileSystem fs = logDir.getFileSystem(conf); 171 if (!fs.exists(logDir)) { 172 LOG.warn(logDir + " doesn't exist. Nothing to do!"); 173 continue; 174 } 175 FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter); 176 if (logfiles == null || logfiles.length == 0) { 177 LOG.info(logDir + " is empty dir, no logs to split"); 178 } else { 179 Collections.addAll(fileStatus, logfiles); 180 } 181 } 182 FileStatus[] a = new FileStatus[fileStatus.size()]; 183 return fileStatus.toArray(a); 184 } 185 186 /** 187 * @param logDir one region sever wal dir path in .logs 188 * @throws IOException if there was an error while splitting any log file 189 * @return cumulative size of the logfiles split 190 * @throws IOException 191 */ 192 public long splitLogDistributed(final Path logDir) throws IOException { 193 List<Path> logDirs = new ArrayList<>(); 194 logDirs.add(logDir); 195 return splitLogDistributed(logDirs); 196 } 197 198 /** 199 * The caller will block until all the log files of the given region server have been processed - 200 * successfully split or an error is encountered - by an available worker region server. This 201 * method must only be called after the region servers have been brought online. 202 * @param logDirs List of log dirs to split 203 * @throws IOException If there was an error while splitting any log file 204 * @return cumulative size of the logfiles split 205 */ 206 public long splitLogDistributed(final List<Path> logDirs) throws IOException { 207 if (logDirs.isEmpty()) { 208 return 0; 209 } 210 Set<ServerName> serverNames = new HashSet<>(); 211 for (Path logDir : logDirs) { 212 try { 213 ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir); 214 if (serverName != null) { 215 serverNames.add(serverName); 216 } 217 } catch (IllegalArgumentException e) { 218 // ignore invalid format error. 219 LOG.warn("Cannot parse server name from " + logDir); 220 } 221 } 222 return splitLogDistributed(serverNames, logDirs, null); 223 } 224 225 /** 226 * The caller will block until all the hbase:meta log files of the given region server have been 227 * processed - successfully split or an error is encountered - by an available worker region 228 * server. This method must only be called after the region servers have been brought online. 229 * @param logDirs List of log dirs to split 230 * @param filter the Path filter to select specific files for considering 231 * @throws IOException If there was an error while splitting any log file 232 * @return cumulative size of the logfiles split 233 */ 234 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs, 235 PathFilter filter) throws IOException { 236 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " + 237 logDirs + " for serverName=" + serverNames); 238 FileStatus[] logfiles = getFileList(logDirs, filter); 239 status.setStatus("Checking directory contents..."); 240 SplitLogCounters.tot_mgr_log_split_batch_start.increment(); 241 LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + 242 " for " + serverNames); 243 long t = EnvironmentEdgeManager.currentTime(); 244 long totalSize = 0; 245 TaskBatch batch = new TaskBatch(); 246 for (FileStatus lf : logfiles) { 247 // TODO If the log file is still being written to - which is most likely 248 // the case for the last log file - then its length will show up here 249 // as zero. The size of such a file can only be retrieved after 250 // recover-lease is done. totalSize will be under in most cases and the 251 // metrics that it drives will also be under-reported. 252 totalSize += lf.getLen(); 253 String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf); 254 if (!enqueueSplitTask(pathToLog, batch)) { 255 throw new IOException("duplicate log split scheduled for " + lf.getPath()); 256 } 257 } 258 waitForSplittingCompletion(batch, status); 259 260 if (batch.done != batch.installed) { 261 batch.isDead = true; 262 SplitLogCounters.tot_mgr_log_split_batch_err.increment(); 263 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed 264 + " but only " + batch.done + " done"); 265 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; 266 status.abort(msg); 267 throw new IOException(msg); 268 } 269 for (Path logDir : logDirs) { 270 status.setStatus("Cleaning up log directory..."); 271 final FileSystem fs = logDir.getFileSystem(conf); 272 try { 273 if (fs.exists(logDir) && !fs.delete(logDir, false)) { 274 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir); 275 } 276 } catch (IOException ioe) { 277 FileStatus[] files = fs.listStatus(logDir); 278 if (files != null && files.length > 0) { 279 LOG.warn("Returning success without actually splitting and " 280 + "deleting all the log files in path " + logDir + ": " 281 + Arrays.toString(files), ioe); 282 } else { 283 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); 284 } 285 } 286 SplitLogCounters.tot_mgr_log_split_batch_success.increment(); 287 } 288 String msg = 289 "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed 290 + " log files in " + logDirs + " in " 291 + (EnvironmentEdgeManager.currentTime() - t) + "ms"; 292 status.markComplete(msg); 293 LOG.info(msg); 294 return totalSize; 295 } 296 297 /** 298 * Add a task entry to coordination if it is not already there. 299 * @param taskname the path of the log to be split 300 * @param batch the batch this task belongs to 301 * @return true if a new entry is created, false if it is already there. 302 */ 303 boolean enqueueSplitTask(String taskname, TaskBatch batch) { 304 lastTaskCreateTime = EnvironmentEdgeManager.currentTime(); 305 String task = getSplitLogManagerCoordination().prepareTask(taskname); 306 Task oldtask = createTaskIfAbsent(task, batch); 307 if (oldtask == null) { 308 // publish the task in the coordination engine 309 getSplitLogManagerCoordination().submitTask(task); 310 return true; 311 } 312 return false; 313 } 314 315 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) { 316 synchronized (batch) { 317 while ((batch.done + batch.error) != batch.installed) { 318 try { 319 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled=" 320 + batch.installed + " done=" + batch.done + " error=" + batch.error); 321 int remaining = batch.installed - (batch.done + batch.error); 322 int actual = activeTasks(batch); 323 if (remaining != actual) { 324 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual); 325 } 326 int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination(); 327 if (remainingTasks >= 0 && actual > remainingTasks) { 328 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are " 329 + remainingTasks); 330 } 331 if (remainingTasks == 0 || actual == 0) { 332 LOG.warn("No more task remaining, splitting " 333 + "should have completed. Remaining tasks is " + remainingTasks 334 + ", active tasks in map " + actual); 335 if (remainingTasks == 0 && actual == 0) { 336 return; 337 } 338 } 339 batch.wait(100); 340 if (server.isStopped()) { 341 LOG.warn("Stopped while waiting for log splits to be completed"); 342 return; 343 } 344 } catch (InterruptedException e) { 345 LOG.warn("Interrupted while waiting for log splits to be completed"); 346 Thread.currentThread().interrupt(); 347 return; 348 } 349 } 350 } 351 } 352 353 @VisibleForTesting 354 ConcurrentMap<String, Task> getTasks() { 355 return tasks; 356 } 357 358 private int activeTasks(final TaskBatch batch) { 359 int count = 0; 360 for (Task t : tasks.values()) { 361 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) { 362 count++; 363 } 364 } 365 return count; 366 367 } 368 369 /** 370 * @param path 371 * @param batch 372 * @return null on success, existing task on error 373 */ 374 private Task createTaskIfAbsent(String path, TaskBatch batch) { 375 Task oldtask; 376 // batch.installed is only changed via this function and 377 // a single thread touches batch.installed. 378 Task newtask = new Task(); 379 newtask.batch = batch; 380 oldtask = tasks.putIfAbsent(path, newtask); 381 if (oldtask == null) { 382 batch.installed++; 383 return null; 384 } 385 // new task was not used. 386 synchronized (oldtask) { 387 if (oldtask.isOrphan()) { 388 if (oldtask.status == SUCCESS) { 389 // The task is already done. Do not install the batch for this 390 // task because it might be too late for setDone() to update 391 // batch.done. There is no need for the batch creator to wait for 392 // this task to complete. 393 return (null); 394 } 395 if (oldtask.status == IN_PROGRESS) { 396 oldtask.batch = batch; 397 batch.installed++; 398 LOG.debug("Previously orphan task " + path + " is now being waited upon"); 399 return null; 400 } 401 while (oldtask.status == FAILURE) { 402 LOG.debug("wait for status of task " + path + " to change to DELETED"); 403 SplitLogCounters.tot_mgr_wait_for_zk_delete.increment(); 404 try { 405 oldtask.wait(); 406 } catch (InterruptedException e) { 407 Thread.currentThread().interrupt(); 408 LOG.warn("Interrupted when waiting for znode delete callback"); 409 // fall through to return failure 410 break; 411 } 412 } 413 if (oldtask.status != DELETED) { 414 LOG.warn("Failure because previously failed task" 415 + " state still present. Waiting for znode delete callback" + " path=" + path); 416 return oldtask; 417 } 418 // reinsert the newTask and it must succeed this time 419 Task t = tasks.putIfAbsent(path, newtask); 420 if (t == null) { 421 batch.installed++; 422 return null; 423 } 424 LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map"); 425 assert false : "Deleted task still present in tasks map"; 426 return t; 427 } 428 LOG.warn("Failure because two threads can't wait for the same task; path=" + path); 429 return oldtask; 430 } 431 } 432 433 public void stop() { 434 if (choreService != null) { 435 choreService.shutdown(); 436 } 437 if (timeoutMonitor != null) { 438 timeoutMonitor.cancel(true); 439 } 440 } 441 442 void handleDeadWorker(ServerName workerName) { 443 // resubmit the tasks on the TimeoutMonitor thread. Makes it easier 444 // to reason about concurrency. Makes it easier to retry. 445 synchronized (deadWorkersLock) { 446 if (deadWorkers == null) { 447 deadWorkers = new HashSet<>(100); 448 } 449 deadWorkers.add(workerName); 450 } 451 LOG.debug("Dead splitlog worker {}", workerName); 452 } 453 454 void handleDeadWorkers(Set<ServerName> serverNames) { 455 synchronized (deadWorkersLock) { 456 if (deadWorkers == null) { 457 deadWorkers = new HashSet<>(100); 458 } 459 deadWorkers.addAll(serverNames); 460 } 461 LOG.debug("Dead splitlog workers {}", serverNames); 462 } 463 464 /** 465 * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed(). 466 * Clients threads use this object to wait for all their tasks to be done. 467 * <p> 468 * All access is synchronized. 469 */ 470 @InterfaceAudience.Private 471 public static class TaskBatch { 472 public int installed = 0; 473 public int done = 0; 474 public int error = 0; 475 public volatile boolean isDead = false; 476 477 @Override 478 public String toString() { 479 return ("installed = " + installed + " done = " + done + " error = " + error); 480 } 481 } 482 483 /** 484 * in memory state of an active task. 485 */ 486 @InterfaceAudience.Private 487 public static class Task { 488 public volatile long last_update; 489 public volatile int last_version; 490 public volatile ServerName cur_worker_name; 491 public volatile TaskBatch batch; 492 public volatile TerminationStatus status; 493 public volatile AtomicInteger incarnation = new AtomicInteger(0); 494 public final AtomicInteger unforcedResubmits = new AtomicInteger(); 495 public volatile boolean resubmitThresholdReached; 496 497 @Override 498 public String toString() { 499 return ("last_update = " + last_update + " last_version = " + last_version 500 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = " 501 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch); 502 } 503 504 public Task() { 505 last_version = -1; 506 status = IN_PROGRESS; 507 setUnassigned(); 508 } 509 510 public boolean isOrphan() { 511 return (batch == null || batch.isDead); 512 } 513 514 public boolean isUnassigned() { 515 return (cur_worker_name == null); 516 } 517 518 public void heartbeatNoDetails(long time) { 519 last_update = time; 520 } 521 522 public void heartbeat(long time, int version, ServerName worker) { 523 last_version = version; 524 last_update = time; 525 cur_worker_name = worker; 526 } 527 528 public void setUnassigned() { 529 cur_worker_name = null; 530 last_update = -1; 531 } 532 } 533 534 /** 535 * Periodically checks all active tasks and resubmits the ones that have timed out 536 */ 537 private class TimeoutMonitor extends ScheduledChore { 538 private long lastLog = 0; 539 540 public TimeoutMonitor(final int period, Stoppable stopper) { 541 super("SplitLogManager Timeout Monitor", stopper, period); 542 } 543 544 @Override 545 protected void chore() { 546 if (server.getCoordinatedStateManager() == null) return; 547 548 int resubmitted = 0; 549 int unassigned = 0; 550 int tot = 0; 551 boolean found_assigned_task = false; 552 Set<ServerName> localDeadWorkers; 553 554 synchronized (deadWorkersLock) { 555 localDeadWorkers = deadWorkers; 556 deadWorkers = null; 557 } 558 559 for (Map.Entry<String, Task> e : tasks.entrySet()) { 560 String path = e.getKey(); 561 Task task = e.getValue(); 562 ServerName cur_worker = task.cur_worker_name; 563 tot++; 564 // don't easily resubmit a task which hasn't been picked up yet. It 565 // might be a long while before a SplitLogWorker is free to pick up a 566 // task. This is because a SplitLogWorker picks up a task one at a 567 // time. If we want progress when there are no region servers then we 568 // will have to run a SplitLogWorker thread in the Master. 569 if (task.isUnassigned()) { 570 unassigned++; 571 continue; 572 } 573 found_assigned_task = true; 574 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { 575 SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment(); 576 if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { 577 resubmitted++; 578 } else { 579 handleDeadWorker(cur_worker); 580 LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker 581 + ", will retry."); 582 } 583 } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) { 584 resubmitted++; 585 } 586 } 587 if (tot > 0) { 588 long now = EnvironmentEdgeManager.currentTime(); 589 if (now > lastLog + 5000) { 590 lastLog = now; 591 LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks); 592 } 593 } 594 if (resubmitted > 0) { 595 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks"); 596 } 597 // If there are pending tasks and all of them have been unassigned for 598 // some time then put up a RESCAN node to ping the workers. 599 // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes 600 // because a. it is very unlikely that every worker had a 601 // transient error when trying to grab the task b. if there are no 602 // workers then all tasks wills stay unassigned indefinitely and the 603 // manager will be indefinitely creating RESCAN nodes. TODO may be the 604 // master should spawn both a manager and a worker thread to guarantee 605 // that there is always one worker in the system 606 if (tot > 0 607 && !found_assigned_task 608 && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) { 609 for (Map.Entry<String, Task> e : tasks.entrySet()) { 610 String key = e.getKey(); 611 Task task = e.getValue(); 612 // we have to do task.isUnassigned() check again because tasks might 613 // have been asynchronously assigned. There is no locking required 614 // for these checks ... it is OK even if tryGetDataSetWatch() is 615 // called unnecessarily for a taskpath 616 if (task.isUnassigned() && (task.status != FAILURE)) { 617 // We just touch the znode to make sure its still there 618 getSplitLogManagerCoordination().checkTaskStillAvailable(key); 619 } 620 } 621 getSplitLogManagerCoordination().checkTasks(); 622 SplitLogCounters.tot_mgr_resubmit_unassigned.increment(); 623 LOG.debug("resubmitting unassigned task(s) after timeout"); 624 } 625 Set<String> failedDeletions = 626 getSplitLogManagerCoordination().getDetails().getFailedDeletions(); 627 // Retry previously failed deletes 628 if (failedDeletions.size() > 0) { 629 List<String> tmpPaths = new ArrayList<>(failedDeletions); 630 for (String tmpPath : tmpPaths) { 631 // deleteNode is an async call 632 getSplitLogManagerCoordination().deleteTask(tmpPath); 633 } 634 failedDeletions.removeAll(tmpPaths); 635 } 636 } 637 } 638 639 public enum ResubmitDirective { 640 CHECK(), FORCE() 641 } 642 643 public enum TerminationStatus { 644 IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted"); 645 646 final String statusMsg; 647 648 TerminationStatus(String msg) { 649 statusMsg = msg; 650 } 651 652 @Override 653 public String toString() { 654 return statusMsg; 655 } 656 } 657}