001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.PathFilter;
044import org.apache.hadoop.hbase.ChoreService;
045import org.apache.hadoop.hbase.ScheduledChore;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.SplitLogCounters;
048import org.apache.hadoop.hbase.Stoppable;
049import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
050import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
051import org.apache.hadoop.hbase.log.HBaseMarkers;
052import org.apache.hadoop.hbase.monitoring.MonitoredTask;
053import org.apache.hadoop.hbase.monitoring.TaskMonitor;
054import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
055import org.apache.hadoop.hbase.util.FSUtils;
056import org.apache.hadoop.hbase.util.HasThread;
057import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
058import org.apache.yetus.audience.InterfaceAudience;
059import org.slf4j.Logger;
060import org.slf4j.LoggerFactory;
061import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
062
063/**
064 * Distributes the task of log splitting to the available region servers.
065 * Coordination happens via coordination engine. For every log file that has to be split a
066 * task is created. SplitLogWorkers race to grab a task.
067 *
068 * <p>SplitLogManager monitors the tasks that it creates using the
069 * timeoutMonitor thread. If a task's progress is slow then
070 * {@link SplitLogManagerCoordination#checkTasks} will take away the
071 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
072 * and the task will be up for grabs again. When the task is done then it is
073 * deleted by SplitLogManager.
074 *
075 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
076 * log files. The caller thread waits in this method until all the log files
077 * have been split.
078 *
079 * <p>All the coordination calls made by this class are asynchronous. This is mainly
080 * to help reduce response time seen by the callers.
081 *
082 * <p>There is race in this design between the SplitLogManager and the
083 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
084 * already been completed by a SplitLogWorker. We rely on the idempotency of
085 * the log splitting task for correctness.
086 *
087 * <p>It is also assumed that every log splitting task is unique and once
088 * completed (either with success or with error) it will be not be submitted
089 * again. If a task is resubmitted then there is a risk that old "delete task"
090 * can delete the re-submission.
091 */
092@InterfaceAudience.Private
093public class SplitLogManager {
094  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
095
096  private final MasterServices server;
097
098  private final Configuration conf;
099  private final ChoreService choreService;
100
101  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
102
103  private long unassignedTimeout;
104  private long lastTaskCreateTime = Long.MAX_VALUE;
105
106  @VisibleForTesting
107  final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
108  private TimeoutMonitor timeoutMonitor;
109
110  private volatile Set<ServerName> deadWorkers = null;
111  private final Object deadWorkersLock = new Object();
112
113  /**
114   * Its OK to construct this object even when region-servers are not online. It does lookup the
115   * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
116   * @param master the master services
117   * @param conf the HBase configuration
118   * @throws IOException
119   */
120  public SplitLogManager(MasterServices master, Configuration conf)
121      throws IOException {
122    this.server = master;
123    this.conf = conf;
124    // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
125    // For example, in tests.
126    String name = master instanceof HasThread? ((HasThread)master).getName():
127        master.getServerName().toShortString();
128    this.choreService =
129        new ChoreService(name + ".splitLogManager.");
130    if (server.getCoordinatedStateManager() != null) {
131      SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
132      Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
133      SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
134      coordination.setDetails(details);
135      coordination.init();
136    }
137    this.unassignedTimeout =
138        conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
139    this.timeoutMonitor =
140        new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
141            master);
142    choreService.scheduleChore(timeoutMonitor);
143  }
144
145  private SplitLogManagerCoordination getSplitLogManagerCoordination() {
146    return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
147  }
148
149  private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
150    return getFileList(conf, logDirs, filter);
151  }
152
153  /**
154   * Get a list of paths that need to be split given a set of server-specific directories and
155   * optionally  a filter.
156   *
157   * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
158   * layout.
159   *
160   * Should be package-private, but is needed by
161   * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
162   *     Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
163   */
164  @VisibleForTesting
165  public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
166      final PathFilter filter)
167      throws IOException {
168    List<FileStatus> fileStatus = new ArrayList<>();
169    for (Path logDir : logDirs) {
170      final FileSystem fs = logDir.getFileSystem(conf);
171      if (!fs.exists(logDir)) {
172        LOG.warn(logDir + " doesn't exist. Nothing to do!");
173        continue;
174      }
175      FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
176      if (logfiles == null || logfiles.length == 0) {
177        LOG.info(logDir + " is empty dir, no logs to split");
178      } else {
179        Collections.addAll(fileStatus, logfiles);
180      }
181    }
182    FileStatus[] a = new FileStatus[fileStatus.size()];
183    return fileStatus.toArray(a);
184  }
185
186  /**
187   * @param logDir one region sever wal dir path in .logs
188   * @throws IOException if there was an error while splitting any log file
189   * @return cumulative size of the logfiles split
190   * @throws IOException
191   */
192  public long splitLogDistributed(final Path logDir) throws IOException {
193    List<Path> logDirs = new ArrayList<>();
194    logDirs.add(logDir);
195    return splitLogDistributed(logDirs);
196  }
197
198  /**
199   * The caller will block until all the log files of the given region server have been processed -
200   * successfully split or an error is encountered - by an available worker region server. This
201   * method must only be called after the region servers have been brought online.
202   * @param logDirs List of log dirs to split
203   * @throws IOException If there was an error while splitting any log file
204   * @return cumulative size of the logfiles split
205   */
206  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
207    if (logDirs.isEmpty()) {
208      return 0;
209    }
210    Set<ServerName> serverNames = new HashSet<>();
211    for (Path logDir : logDirs) {
212      try {
213        ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
214        if (serverName != null) {
215          serverNames.add(serverName);
216        }
217      } catch (IllegalArgumentException e) {
218        // ignore invalid format error.
219        LOG.warn("Cannot parse server name from " + logDir);
220      }
221    }
222    return splitLogDistributed(serverNames, logDirs, null);
223  }
224
225  /**
226   * The caller will block until all the hbase:meta log files of the given region server have been
227   * processed - successfully split or an error is encountered - by an available worker region
228   * server. This method must only be called after the region servers have been brought online.
229   * @param logDirs List of log dirs to split
230   * @param filter the Path filter to select specific files for considering
231   * @throws IOException If there was an error while splitting any log file
232   * @return cumulative size of the logfiles split
233   */
234  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
235      PathFilter filter) throws IOException {
236    MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
237      logDirs + " for serverName=" + serverNames);
238    FileStatus[] logfiles = getFileList(logDirs, filter);
239    status.setStatus("Checking directory contents...");
240    SplitLogCounters.tot_mgr_log_split_batch_start.increment();
241    LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
242      " for " + serverNames);
243    long t = EnvironmentEdgeManager.currentTime();
244    long totalSize = 0;
245    TaskBatch batch = new TaskBatch();
246    for (FileStatus lf : logfiles) {
247      // TODO If the log file is still being written to - which is most likely
248      // the case for the last log file - then its length will show up here
249      // as zero. The size of such a file can only be retrieved after
250      // recover-lease is done. totalSize will be under in most cases and the
251      // metrics that it drives will also be under-reported.
252      totalSize += lf.getLen();
253      String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
254      if (!enqueueSplitTask(pathToLog, batch)) {
255        throw new IOException("duplicate log split scheduled for " + lf.getPath());
256      }
257    }
258    waitForSplittingCompletion(batch, status);
259
260    if (batch.done != batch.installed) {
261      batch.isDead = true;
262      SplitLogCounters.tot_mgr_log_split_batch_err.increment();
263      LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
264          + " but only " + batch.done + " done");
265      String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
266      status.abort(msg);
267      throw new IOException(msg);
268    }
269    for (Path logDir : logDirs) {
270      status.setStatus("Cleaning up log directory...");
271      final FileSystem fs = logDir.getFileSystem(conf);
272      try {
273        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
274          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
275        }
276      } catch (IOException ioe) {
277        FileStatus[] files = fs.listStatus(logDir);
278        if (files != null && files.length > 0) {
279          LOG.warn("Returning success without actually splitting and "
280              + "deleting all the log files in path " + logDir + ": "
281              + Arrays.toString(files), ioe);
282        } else {
283          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
284        }
285      }
286      SplitLogCounters.tot_mgr_log_split_batch_success.increment();
287    }
288    String msg =
289        "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
290            + " log files in " + logDirs + " in "
291            + (EnvironmentEdgeManager.currentTime() - t) + "ms";
292    status.markComplete(msg);
293    LOG.info(msg);
294    return totalSize;
295  }
296
297  /**
298   * Add a task entry to coordination if it is not already there.
299   * @param taskname the path of the log to be split
300   * @param batch the batch this task belongs to
301   * @return true if a new entry is created, false if it is already there.
302   */
303  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
304    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
305    String task = getSplitLogManagerCoordination().prepareTask(taskname);
306    Task oldtask = createTaskIfAbsent(task, batch);
307    if (oldtask == null) {
308      // publish the task in the coordination engine
309      getSplitLogManagerCoordination().submitTask(task);
310      return true;
311    }
312    return false;
313  }
314
315  private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
316    synchronized (batch) {
317      while ((batch.done + batch.error) != batch.installed) {
318        try {
319          status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
320              + batch.installed + " done=" + batch.done + " error=" + batch.error);
321          int remaining = batch.installed - (batch.done + batch.error);
322          int actual = activeTasks(batch);
323          if (remaining != actual) {
324            LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
325          }
326          int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
327          if (remainingTasks >= 0 && actual > remainingTasks) {
328            LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
329                + remainingTasks);
330          }
331          if (remainingTasks == 0 || actual == 0) {
332            LOG.warn("No more task remaining, splitting "
333                + "should have completed. Remaining tasks is " + remainingTasks
334                + ", active tasks in map " + actual);
335            if (remainingTasks == 0 && actual == 0) {
336              return;
337            }
338          }
339          batch.wait(100);
340          if (server.isStopped()) {
341            LOG.warn("Stopped while waiting for log splits to be completed");
342            return;
343          }
344        } catch (InterruptedException e) {
345          LOG.warn("Interrupted while waiting for log splits to be completed");
346          Thread.currentThread().interrupt();
347          return;
348        }
349      }
350    }
351  }
352
353  @VisibleForTesting
354  ConcurrentMap<String, Task> getTasks() {
355    return tasks;
356  }
357
358  private int activeTasks(final TaskBatch batch) {
359    int count = 0;
360    for (Task t : tasks.values()) {
361      if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
362        count++;
363      }
364    }
365    return count;
366
367  }
368
369  /**
370   * @param path
371   * @param batch
372   * @return null on success, existing task on error
373   */
374  private Task createTaskIfAbsent(String path, TaskBatch batch) {
375    Task oldtask;
376    // batch.installed is only changed via this function and
377    // a single thread touches batch.installed.
378    Task newtask = new Task();
379    newtask.batch = batch;
380    oldtask = tasks.putIfAbsent(path, newtask);
381    if (oldtask == null) {
382      batch.installed++;
383      return null;
384    }
385    // new task was not used.
386    synchronized (oldtask) {
387      if (oldtask.isOrphan()) {
388        if (oldtask.status == SUCCESS) {
389          // The task is already done. Do not install the batch for this
390          // task because it might be too late for setDone() to update
391          // batch.done. There is no need for the batch creator to wait for
392          // this task to complete.
393          return (null);
394        }
395        if (oldtask.status == IN_PROGRESS) {
396          oldtask.batch = batch;
397          batch.installed++;
398          LOG.debug("Previously orphan task " + path + " is now being waited upon");
399          return null;
400        }
401        while (oldtask.status == FAILURE) {
402          LOG.debug("wait for status of task " + path + " to change to DELETED");
403          SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
404          try {
405            oldtask.wait();
406          } catch (InterruptedException e) {
407            Thread.currentThread().interrupt();
408            LOG.warn("Interrupted when waiting for znode delete callback");
409            // fall through to return failure
410            break;
411          }
412        }
413        if (oldtask.status != DELETED) {
414          LOG.warn("Failure because previously failed task"
415              + " state still present. Waiting for znode delete callback" + " path=" + path);
416          return oldtask;
417        }
418        // reinsert the newTask and it must succeed this time
419        Task t = tasks.putIfAbsent(path, newtask);
420        if (t == null) {
421          batch.installed++;
422          return null;
423        }
424        LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
425        assert false : "Deleted task still present in tasks map";
426        return t;
427      }
428      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
429      return oldtask;
430    }
431  }
432
433  public void stop() {
434    if (choreService != null) {
435      choreService.shutdown();
436    }
437    if (timeoutMonitor != null) {
438      timeoutMonitor.cancel(true);
439    }
440  }
441
442  void handleDeadWorker(ServerName workerName) {
443    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
444    // to reason about concurrency. Makes it easier to retry.
445    synchronized (deadWorkersLock) {
446      if (deadWorkers == null) {
447        deadWorkers = new HashSet<>(100);
448      }
449      deadWorkers.add(workerName);
450    }
451    LOG.debug("Dead splitlog worker {}", workerName);
452  }
453
454  void handleDeadWorkers(Set<ServerName> serverNames) {
455    synchronized (deadWorkersLock) {
456      if (deadWorkers == null) {
457        deadWorkers = new HashSet<>(100);
458      }
459      deadWorkers.addAll(serverNames);
460    }
461    LOG.debug("Dead splitlog workers {}", serverNames);
462  }
463
464  /**
465   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
466   * Clients threads use this object to wait for all their tasks to be done.
467   * <p>
468   * All access is synchronized.
469   */
470  @InterfaceAudience.Private
471  public static class TaskBatch {
472    public int installed = 0;
473    public int done = 0;
474    public int error = 0;
475    public volatile boolean isDead = false;
476
477    @Override
478    public String toString() {
479      return ("installed = " + installed + " done = " + done + " error = " + error);
480    }
481  }
482
483  /**
484   * in memory state of an active task.
485   */
486  @InterfaceAudience.Private
487  public static class Task {
488    public volatile long last_update;
489    public volatile int last_version;
490    public volatile ServerName cur_worker_name;
491    public volatile TaskBatch batch;
492    public volatile TerminationStatus status;
493    public volatile AtomicInteger incarnation = new AtomicInteger(0);
494    public final AtomicInteger unforcedResubmits = new AtomicInteger();
495    public volatile boolean resubmitThresholdReached;
496
497    @Override
498    public String toString() {
499      return ("last_update = " + last_update + " last_version = " + last_version
500          + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
501          + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
502    }
503
504    public Task() {
505      last_version = -1;
506      status = IN_PROGRESS;
507      setUnassigned();
508    }
509
510    public boolean isOrphan() {
511      return (batch == null || batch.isDead);
512    }
513
514    public boolean isUnassigned() {
515      return (cur_worker_name == null);
516    }
517
518    public void heartbeatNoDetails(long time) {
519      last_update = time;
520    }
521
522    public void heartbeat(long time, int version, ServerName worker) {
523      last_version = version;
524      last_update = time;
525      cur_worker_name = worker;
526    }
527
528    public void setUnassigned() {
529      cur_worker_name = null;
530      last_update = -1;
531    }
532  }
533
534  /**
535   * Periodically checks all active tasks and resubmits the ones that have timed out
536   */
537  private class TimeoutMonitor extends ScheduledChore {
538    private long lastLog = 0;
539
540    public TimeoutMonitor(final int period, Stoppable stopper) {
541      super("SplitLogManager Timeout Monitor", stopper, period);
542    }
543
544    @Override
545    protected void chore() {
546      if (server.getCoordinatedStateManager() == null) return;
547
548      int resubmitted = 0;
549      int unassigned = 0;
550      int tot = 0;
551      boolean found_assigned_task = false;
552      Set<ServerName> localDeadWorkers;
553
554      synchronized (deadWorkersLock) {
555        localDeadWorkers = deadWorkers;
556        deadWorkers = null;
557      }
558
559      for (Map.Entry<String, Task> e : tasks.entrySet()) {
560        String path = e.getKey();
561        Task task = e.getValue();
562        ServerName cur_worker = task.cur_worker_name;
563        tot++;
564        // don't easily resubmit a task which hasn't been picked up yet. It
565        // might be a long while before a SplitLogWorker is free to pick up a
566        // task. This is because a SplitLogWorker picks up a task one at a
567        // time. If we want progress when there are no region servers then we
568        // will have to run a SplitLogWorker thread in the Master.
569        if (task.isUnassigned()) {
570          unassigned++;
571          continue;
572        }
573        found_assigned_task = true;
574        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
575          SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
576          if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
577            resubmitted++;
578          } else {
579            handleDeadWorker(cur_worker);
580            LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
581                + ", will retry.");
582          }
583        } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
584          resubmitted++;
585        }
586      }
587      if (tot > 0) {
588        long now = EnvironmentEdgeManager.currentTime();
589        if (now > lastLog + 5000) {
590          lastLog = now;
591          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
592        }
593      }
594      if (resubmitted > 0) {
595        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
596      }
597      // If there are pending tasks and all of them have been unassigned for
598      // some time then put up a RESCAN node to ping the workers.
599      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
600      // because a. it is very unlikely that every worker had a
601      // transient error when trying to grab the task b. if there are no
602      // workers then all tasks wills stay unassigned indefinitely and the
603      // manager will be indefinitely creating RESCAN nodes. TODO may be the
604      // master should spawn both a manager and a worker thread to guarantee
605      // that there is always one worker in the system
606      if (tot > 0
607          && !found_assigned_task
608          && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
609        for (Map.Entry<String, Task> e : tasks.entrySet()) {
610          String key = e.getKey();
611          Task task = e.getValue();
612          // we have to do task.isUnassigned() check again because tasks might
613          // have been asynchronously assigned. There is no locking required
614          // for these checks ... it is OK even if tryGetDataSetWatch() is
615          // called unnecessarily for a taskpath
616          if (task.isUnassigned() && (task.status != FAILURE)) {
617            // We just touch the znode to make sure its still there
618            getSplitLogManagerCoordination().checkTaskStillAvailable(key);
619          }
620        }
621        getSplitLogManagerCoordination().checkTasks();
622        SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
623        LOG.debug("resubmitting unassigned task(s) after timeout");
624      }
625      Set<String> failedDeletions =
626        getSplitLogManagerCoordination().getDetails().getFailedDeletions();
627      // Retry previously failed deletes
628      if (failedDeletions.size() > 0) {
629        List<String> tmpPaths = new ArrayList<>(failedDeletions);
630        for (String tmpPath : tmpPaths) {
631          // deleteNode is an async call
632          getSplitLogManagerCoordination().deleteTask(tmpPath);
633        }
634        failedDeletions.removeAll(tmpPaths);
635      }
636    }
637  }
638
639  public enum ResubmitDirective {
640    CHECK(), FORCE()
641  }
642
643  public enum TerminationStatus {
644    IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
645
646    final String statusMsg;
647
648    TerminationStatus(String msg) {
649      statusMsg = msg;
650    }
651
652    @Override
653    public String toString() {
654      return statusMsg;
655    }
656  }
657}