001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import java.util.concurrent.atomic.AtomicInteger;
038
039import org.apache.hadoop.conf.Configuration;
040import org.apache.hadoop.fs.FileStatus;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.fs.PathFilter;
044import org.apache.hadoop.hbase.ChoreService;
045import org.apache.hadoop.hbase.ScheduledChore;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.SplitLogCounters;
048import org.apache.hadoop.hbase.Stoppable;
049import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
050import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
051import org.apache.hadoop.hbase.log.HBaseMarkers;
052import org.apache.hadoop.hbase.monitoring.MonitoredTask;
053import org.apache.hadoop.hbase.monitoring.TaskMonitor;
054import org.apache.hadoop.hbase.procedure2.util.StringUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.util.FSUtils;
057import org.apache.hadoop.hbase.util.HasThread;
058import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
059import org.apache.yetus.audience.InterfaceAudience;
060import org.slf4j.Logger;
061import org.slf4j.LoggerFactory;
062import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
063
064/**
065 * Distributes the task of log splitting to the available region servers.
066 * Coordination happens via coordination engine. For every log file that has to be split a
067 * task is created. SplitLogWorkers race to grab a task.
068 *
069 * <p>SplitLogManager monitors the tasks that it creates using the
070 * timeoutMonitor thread. If a task's progress is slow then
071 * {@link SplitLogManagerCoordination#checkTasks} will take away the
072 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
073 * and the task will be up for grabs again. When the task is done then it is
074 * deleted by SplitLogManager.
075 *
076 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
077 * log files. The caller thread waits in this method until all the log files
078 * have been split.
079 *
080 * <p>All the coordination calls made by this class are asynchronous. This is mainly
081 * to help reduce response time seen by the callers.
082 *
083 * <p>There is race in this design between the SplitLogManager and the
084 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
085 * already been completed by a SplitLogWorker. We rely on the idempotency of
086 * the log splitting task for correctness.
087 *
088 * <p>It is also assumed that every log splitting task is unique and once
089 * completed (either with success or with error) it will be not be submitted
090 * again. If a task is resubmitted then there is a risk that old "delete task"
091 * can delete the re-submission.
092 */
093@InterfaceAudience.Private
094public class SplitLogManager {
095  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
096
097  private final MasterServices server;
098
099  private final Configuration conf;
100  private final ChoreService choreService;
101
102  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
103
104  private long unassignedTimeout;
105  private long lastTaskCreateTime = Long.MAX_VALUE;
106
107  @VisibleForTesting
108  final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
109  private TimeoutMonitor timeoutMonitor;
110
111  private volatile Set<ServerName> deadWorkers = null;
112  private final Object deadWorkersLock = new Object();
113
114  /**
115   * Its OK to construct this object even when region-servers are not online. It does lookup the
116   * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
117   * @param master the master services
118   * @param conf the HBase configuration
119   * @throws IOException
120   */
121  public SplitLogManager(MasterServices master, Configuration conf)
122      throws IOException {
123    this.server = master;
124    this.conf = conf;
125    // Get Server Thread name. Sometimes the Server is mocked so may not implement HasThread.
126    // For example, in tests.
127    String name = master instanceof HasThread? ((HasThread)master).getName():
128        master.getServerName().toShortString();
129    this.choreService =
130        new ChoreService(name + ".splitLogManager.");
131    if (server.getCoordinatedStateManager() != null) {
132      SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
133      Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
134      SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
135      coordination.setDetails(details);
136      coordination.init();
137    }
138    this.unassignedTimeout =
139        conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
140    this.timeoutMonitor =
141        new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
142            master);
143    choreService.scheduleChore(timeoutMonitor);
144  }
145
146  private SplitLogManagerCoordination getSplitLogManagerCoordination() {
147    return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
148  }
149
150  private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
151    return getFileList(conf, logDirs, filter);
152  }
153
154  /**
155   * Get a list of paths that need to be split given a set of server-specific directories and
156   * optionally  a filter.
157   *
158   * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
159   * layout.
160   *
161   * Should be package-private, but is needed by
162   * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
163   *     Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
164   */
165  @VisibleForTesting
166  public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
167      final PathFilter filter)
168      throws IOException {
169    List<FileStatus> fileStatus = new ArrayList<>();
170    for (Path logDir : logDirs) {
171      final FileSystem fs = logDir.getFileSystem(conf);
172      if (!fs.exists(logDir)) {
173        LOG.warn(logDir + " doesn't exist. Nothing to do!");
174        continue;
175      }
176      FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
177      if (logfiles == null || logfiles.length == 0) {
178        LOG.info("{} dir is empty, no logs to split.", logDir);
179      } else {
180        Collections.addAll(fileStatus, logfiles);
181      }
182    }
183    FileStatus[] a = new FileStatus[fileStatus.size()];
184    return fileStatus.toArray(a);
185  }
186
187  /**
188   * @param logDir one region sever wal dir path in .logs
189   * @throws IOException if there was an error while splitting any log file
190   * @return cumulative size of the logfiles split
191   * @throws IOException
192   */
193  public long splitLogDistributed(final Path logDir) throws IOException {
194    List<Path> logDirs = new ArrayList<>();
195    logDirs.add(logDir);
196    return splitLogDistributed(logDirs);
197  }
198
199  /**
200   * The caller will block until all the log files of the given region server have been processed -
201   * successfully split or an error is encountered - by an available worker region server. This
202   * method must only be called after the region servers have been brought online.
203   * @param logDirs List of log dirs to split
204   * @throws IOException If there was an error while splitting any log file
205   * @return cumulative size of the logfiles split
206   */
207  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
208    if (logDirs.isEmpty()) {
209      return 0;
210    }
211    Set<ServerName> serverNames = new HashSet<>();
212    for (Path logDir : logDirs) {
213      try {
214        ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
215        if (serverName != null) {
216          serverNames.add(serverName);
217        }
218      } catch (IllegalArgumentException e) {
219        // ignore invalid format error.
220        LOG.warn("Cannot parse server name from " + logDir);
221      }
222    }
223    return splitLogDistributed(serverNames, logDirs, null);
224  }
225
226  /**
227   * The caller will block until all the hbase:meta log files of the given region server have been
228   * processed - successfully split or an error is encountered - by an available worker region
229   * server. This method must only be called after the region servers have been brought online.
230   * @param logDirs List of log dirs to split
231   * @param filter the Path filter to select specific files for considering
232   * @throws IOException If there was an error while splitting any log file
233   * @return cumulative size of the logfiles split
234   */
235  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
236      PathFilter filter) throws IOException {
237    MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
238      logDirs + " for serverName=" + serverNames);
239    long totalSize = 0;
240    TaskBatch batch = null;
241    long startTime = 0;
242    FileStatus[] logfiles = getFileList(logDirs, filter);
243    if (logfiles.length != 0) {
244      status.setStatus("Checking directory contents...");
245      SplitLogCounters.tot_mgr_log_split_batch_start.increment();
246      LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
247          " for " + serverNames);
248      startTime = EnvironmentEdgeManager.currentTime();
249      batch = new TaskBatch();
250      for (FileStatus lf : logfiles) {
251        // TODO If the log file is still being written to - which is most likely
252        // the case for the last log file - then its length will show up here
253        // as zero. The size of such a file can only be retrieved after
254        // recover-lease is done. totalSize will be under in most cases and the
255        // metrics that it drives will also be under-reported.
256        totalSize += lf.getLen();
257        String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
258        if (!enqueueSplitTask(pathToLog, batch)) {
259          throw new IOException("duplicate log split scheduled for " + lf.getPath());
260        }
261      }
262      waitForSplittingCompletion(batch, status);
263    }
264
265    if (batch != null && batch.done != batch.installed) {
266      batch.isDead = true;
267      SplitLogCounters.tot_mgr_log_split_batch_err.increment();
268      LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
269          + " but only " + batch.done + " done");
270      String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
271      status.abort(msg);
272      throw new IOException(msg);
273    }
274    for (Path logDir : logDirs) {
275      status.setStatus("Cleaning up log directory...");
276      final FileSystem fs = logDir.getFileSystem(conf);
277      try {
278        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
279          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
280        }
281      } catch (IOException ioe) {
282        FileStatus[] files = fs.listStatus(logDir);
283        if (files != null && files.length > 0) {
284          LOG.warn("Returning success without actually splitting and "
285              + "deleting all the log files in path " + logDir + ": "
286              + Arrays.toString(files), ioe);
287        } else {
288          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
289        }
290      }
291      SplitLogCounters.tot_mgr_log_split_batch_success.increment();
292    }
293    String msg =
294        "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize) + " (" +
295            totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed) + " log files in " +
296            logDirs + " in " +
297            ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) +
298            "ms";
299    status.markComplete(msg);
300    LOG.info(msg);
301    return totalSize;
302  }
303
304  /**
305   * Add a task entry to coordination if it is not already there.
306   * @param taskname the path of the log to be split
307   * @param batch the batch this task belongs to
308   * @return true if a new entry is created, false if it is already there.
309   */
310  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
311    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
312    String task = getSplitLogManagerCoordination().prepareTask(taskname);
313    Task oldtask = createTaskIfAbsent(task, batch);
314    if (oldtask == null) {
315      // publish the task in the coordination engine
316      getSplitLogManagerCoordination().submitTask(task);
317      return true;
318    }
319    return false;
320  }
321
322  private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
323    synchronized (batch) {
324      while ((batch.done + batch.error) != batch.installed) {
325        try {
326          status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
327              + batch.installed + " done=" + batch.done + " error=" + batch.error);
328          int remaining = batch.installed - (batch.done + batch.error);
329          int actual = activeTasks(batch);
330          if (remaining != actual) {
331            LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
332          }
333          int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
334          if (remainingTasks >= 0 && actual > remainingTasks) {
335            LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
336                + remainingTasks);
337          }
338          if (remainingTasks == 0 || actual == 0) {
339            LOG.warn("No more task remaining, splitting "
340                + "should have completed. Remaining tasks is " + remainingTasks
341                + ", active tasks in map " + actual);
342            if (remainingTasks == 0 && actual == 0) {
343              return;
344            }
345          }
346          batch.wait(100);
347          if (server.isStopped()) {
348            LOG.warn("Stopped while waiting for log splits to be completed");
349            return;
350          }
351        } catch (InterruptedException e) {
352          LOG.warn("Interrupted while waiting for log splits to be completed");
353          Thread.currentThread().interrupt();
354          return;
355        }
356      }
357    }
358  }
359
360  @VisibleForTesting
361  ConcurrentMap<String, Task> getTasks() {
362    return tasks;
363  }
364
365  private int activeTasks(final TaskBatch batch) {
366    int count = 0;
367    for (Task t : tasks.values()) {
368      if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
369        count++;
370      }
371    }
372    return count;
373
374  }
375
376  /**
377   * @param path
378   * @param batch
379   * @return null on success, existing task on error
380   */
381  private Task createTaskIfAbsent(String path, TaskBatch batch) {
382    Task oldtask;
383    // batch.installed is only changed via this function and
384    // a single thread touches batch.installed.
385    Task newtask = new Task();
386    newtask.batch = batch;
387    oldtask = tasks.putIfAbsent(path, newtask);
388    if (oldtask == null) {
389      batch.installed++;
390      return null;
391    }
392    // new task was not used.
393    synchronized (oldtask) {
394      if (oldtask.isOrphan()) {
395        if (oldtask.status == SUCCESS) {
396          // The task is already done. Do not install the batch for this
397          // task because it might be too late for setDone() to update
398          // batch.done. There is no need for the batch creator to wait for
399          // this task to complete.
400          return (null);
401        }
402        if (oldtask.status == IN_PROGRESS) {
403          oldtask.batch = batch;
404          batch.installed++;
405          LOG.debug("Previously orphan task " + path + " is now being waited upon");
406          return null;
407        }
408        while (oldtask.status == FAILURE) {
409          LOG.debug("wait for status of task " + path + " to change to DELETED");
410          SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
411          try {
412            oldtask.wait();
413          } catch (InterruptedException e) {
414            Thread.currentThread().interrupt();
415            LOG.warn("Interrupted when waiting for znode delete callback");
416            // fall through to return failure
417            break;
418          }
419        }
420        if (oldtask.status != DELETED) {
421          LOG.warn("Failure because previously failed task"
422              + " state still present. Waiting for znode delete callback" + " path=" + path);
423          return oldtask;
424        }
425        // reinsert the newTask and it must succeed this time
426        Task t = tasks.putIfAbsent(path, newtask);
427        if (t == null) {
428          batch.installed++;
429          return null;
430        }
431        LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
432        assert false : "Deleted task still present in tasks map";
433        return t;
434      }
435      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
436      return oldtask;
437    }
438  }
439
440  public void stop() {
441    if (choreService != null) {
442      choreService.shutdown();
443    }
444    if (timeoutMonitor != null) {
445      timeoutMonitor.cancel(true);
446    }
447  }
448
449  void handleDeadWorker(ServerName workerName) {
450    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
451    // to reason about concurrency. Makes it easier to retry.
452    synchronized (deadWorkersLock) {
453      if (deadWorkers == null) {
454        deadWorkers = new HashSet<>(100);
455      }
456      deadWorkers.add(workerName);
457    }
458    LOG.info("Dead splitlog worker {}", workerName);
459  }
460
461  void handleDeadWorkers(Set<ServerName> serverNames) {
462    synchronized (deadWorkersLock) {
463      if (deadWorkers == null) {
464        deadWorkers = new HashSet<>(100);
465      }
466      deadWorkers.addAll(serverNames);
467    }
468    LOG.info("dead splitlog workers " + serverNames);
469  }
470
471  /**
472   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
473   * Clients threads use this object to wait for all their tasks to be done.
474   * <p>
475   * All access is synchronized.
476   */
477  @InterfaceAudience.Private
478  public static class TaskBatch {
479    public int installed = 0;
480    public int done = 0;
481    public int error = 0;
482    public volatile boolean isDead = false;
483
484    @Override
485    public String toString() {
486      return ("installed = " + installed + " done = " + done + " error = " + error);
487    }
488  }
489
490  /**
491   * in memory state of an active task.
492   */
493  @InterfaceAudience.Private
494  public static class Task {
495    public volatile long last_update;
496    public volatile int last_version;
497    public volatile ServerName cur_worker_name;
498    public volatile TaskBatch batch;
499    public volatile TerminationStatus status;
500    public volatile AtomicInteger incarnation = new AtomicInteger(0);
501    public final AtomicInteger unforcedResubmits = new AtomicInteger();
502    public volatile boolean resubmitThresholdReached;
503
504    @Override
505    public String toString() {
506      return ("last_update = " + last_update + " last_version = " + last_version
507          + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
508          + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
509    }
510
511    public Task() {
512      last_version = -1;
513      status = IN_PROGRESS;
514      setUnassigned();
515    }
516
517    public boolean isOrphan() {
518      return (batch == null || batch.isDead);
519    }
520
521    public boolean isUnassigned() {
522      return (cur_worker_name == null);
523    }
524
525    public void heartbeatNoDetails(long time) {
526      last_update = time;
527    }
528
529    public void heartbeat(long time, int version, ServerName worker) {
530      last_version = version;
531      last_update = time;
532      cur_worker_name = worker;
533    }
534
535    public void setUnassigned() {
536      cur_worker_name = null;
537      last_update = -1;
538    }
539  }
540
541  /**
542   * Periodically checks all active tasks and resubmits the ones that have timed out
543   */
544  private class TimeoutMonitor extends ScheduledChore {
545    private long lastLog = 0;
546
547    public TimeoutMonitor(final int period, Stoppable stopper) {
548      super("SplitLogManager Timeout Monitor", stopper, period);
549    }
550
551    @Override
552    protected void chore() {
553      if (server.getCoordinatedStateManager() == null) return;
554
555      int resubmitted = 0;
556      int unassigned = 0;
557      int tot = 0;
558      boolean found_assigned_task = false;
559      Set<ServerName> localDeadWorkers;
560
561      synchronized (deadWorkersLock) {
562        localDeadWorkers = deadWorkers;
563        deadWorkers = null;
564      }
565
566      for (Map.Entry<String, Task> e : tasks.entrySet()) {
567        String path = e.getKey();
568        Task task = e.getValue();
569        ServerName cur_worker = task.cur_worker_name;
570        tot++;
571        // don't easily resubmit a task which hasn't been picked up yet. It
572        // might be a long while before a SplitLogWorker is free to pick up a
573        // task. This is because a SplitLogWorker picks up a task one at a
574        // time. If we want progress when there are no region servers then we
575        // will have to run a SplitLogWorker thread in the Master.
576        if (task.isUnassigned()) {
577          unassigned++;
578          continue;
579        }
580        found_assigned_task = true;
581        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
582          SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
583          if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
584            resubmitted++;
585          } else {
586            handleDeadWorker(cur_worker);
587            LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
588                + ", will retry.");
589          }
590        } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
591          resubmitted++;
592        }
593      }
594      if (tot > 0) {
595        long now = EnvironmentEdgeManager.currentTime();
596        if (now > lastLog + 5000) {
597          lastLog = now;
598          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
599        }
600      }
601      if (resubmitted > 0) {
602        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
603      }
604      // If there are pending tasks and all of them have been unassigned for
605      // some time then put up a RESCAN node to ping the workers.
606      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
607      // because a. it is very unlikely that every worker had a
608      // transient error when trying to grab the task b. if there are no
609      // workers then all tasks wills stay unassigned indefinitely and the
610      // manager will be indefinitely creating RESCAN nodes. TODO may be the
611      // master should spawn both a manager and a worker thread to guarantee
612      // that there is always one worker in the system
613      if (tot > 0
614          && !found_assigned_task
615          && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
616        for (Map.Entry<String, Task> e : tasks.entrySet()) {
617          String key = e.getKey();
618          Task task = e.getValue();
619          // we have to do task.isUnassigned() check again because tasks might
620          // have been asynchronously assigned. There is no locking required
621          // for these checks ... it is OK even if tryGetDataSetWatch() is
622          // called unnecessarily for a taskpath
623          if (task.isUnassigned() && (task.status != FAILURE)) {
624            // We just touch the znode to make sure its still there
625            getSplitLogManagerCoordination().checkTaskStillAvailable(key);
626          }
627        }
628        getSplitLogManagerCoordination().checkTasks();
629        SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
630        LOG.debug("resubmitting unassigned task(s) after timeout");
631      }
632      Set<String> failedDeletions =
633        getSplitLogManagerCoordination().getDetails().getFailedDeletions();
634      // Retry previously failed deletes
635      if (failedDeletions.size() > 0) {
636        List<String> tmpPaths = new ArrayList<>(failedDeletions);
637        for (String tmpPath : tmpPaths) {
638          // deleteNode is an async call
639          getSplitLogManagerCoordination().deleteTask(tmpPath);
640        }
641        failedDeletions.removeAll(tmpPaths);
642      }
643    }
644  }
645
646  public enum ResubmitDirective {
647    CHECK(), FORCE()
648  }
649
650  public enum TerminationStatus {
651    IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
652
653    final String statusMsg;
654
655    TerminationStatus(String msg) {
656      statusMsg = msg;
657    }
658
659    @Override
660    public String toString() {
661      return statusMsg;
662    }
663  }
664}