001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.PathFilter;
043import org.apache.hadoop.hbase.ChoreService;
044import org.apache.hadoop.hbase.ScheduledChore;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.SplitLogCounters;
047import org.apache.hadoop.hbase.Stoppable;
048import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
049import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
050import org.apache.hadoop.hbase.log.HBaseMarkers;
051import org.apache.hadoop.hbase.monitoring.MonitoredTask;
052import org.apache.hadoop.hbase.monitoring.TaskMonitor;
053import org.apache.hadoop.hbase.procedure2.util.StringUtils;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Distributes the task of log splitting to the available region servers. Coordination happens via
063 * coordination engine. For every log file that has to be split a task is created. SplitLogWorkers
064 * race to grab a task.
065 * <p>
066 * SplitLogManager monitors the tasks that it creates using the timeoutMonitor thread. If a task's
067 * progress is slow then {@link SplitLogManagerCoordination#checkTasks} will take away the task from
068 * the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and the task will be up for
069 * grabs again. When the task is done then it is deleted by SplitLogManager.
070 * <p>
071 * Clients call {@link #splitLogDistributed(Path)} to split a region server's log files. The caller
072 * thread waits in this method until all the log files have been split.
073 * <p>
074 * All the coordination calls made by this class are asynchronous. This is mainly to help reduce
075 * response time seen by the callers.
076 * <p>
077 * There is race in this design between the SplitLogManager and the SplitLogWorker. SplitLogManager
078 * might re-queue a task that has in reality already been completed by a SplitLogWorker. We rely on
079 * the idempotency of the log splitting task for correctness.
080 * <p>
081 * It is also assumed that every log splitting task is unique and once completed (either with
082 * success or with error) it will be not be submitted again. If a task is resubmitted then there is
083 * a risk that old "delete task" can delete the re-submission.
084 * @see SplitWALManager for an alternate implementation based on Procedures.
085 * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
086 *             distributed WAL splitter, see SplitWALManager.
087 */
088@Deprecated
089@InterfaceAudience.Private
090public class SplitLogManager {
091  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
092
093  private final MasterServices server;
094
095  private final Configuration conf;
096  private final ChoreService choreService;
097
098  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
099
100  private long unassignedTimeout;
101  private long lastTaskCreateTime = Long.MAX_VALUE;
102
103  final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
104  private TimeoutMonitor timeoutMonitor;
105
106  private volatile Set<ServerName> deadWorkers = null;
107  private final Object deadWorkersLock = new Object();
108
109  /**
110   * Its OK to construct this object even when region-servers are not online. It does lookup the
111   * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
112   * @param master the master services
113   * @param conf   the HBase configuration n
114   */
115  public SplitLogManager(MasterServices master, Configuration conf) throws IOException {
116    this.server = master;
117    this.conf = conf;
118    // If no CoordinatedStateManager, skip registering as a chore service (The
119    // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL
120    // splitting. It is null if we are configured to use procedure-based distributed WAL
121    // splitting.
122    if (server.getCoordinatedStateManager() != null) {
123      this.choreService =
124        new ChoreService(master.getServerName().toShortString() + ".splitLogManager.");
125      SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
126      Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
127      SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
128      coordination.setDetails(details);
129      coordination.init();
130      this.unassignedTimeout =
131        conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
132      this.timeoutMonitor = new TimeoutMonitor(
133        conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), master);
134      this.choreService.scheduleChore(timeoutMonitor);
135    } else {
136      this.choreService = null;
137      this.timeoutMonitor = null;
138    }
139  }
140
141  private SplitLogManagerCoordination getSplitLogManagerCoordination() {
142    return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
143  }
144
145  private List<FileStatus> getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
146    return getFileList(conf, logDirs, filter);
147  }
148
149  /**
150   * Get a list of paths that need to be split given a set of server-specific directories and
151   * optionally a filter.
152   * <p/>
153   * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
154   * layout.
155   * <p/>
156   * Should be package-private, but is needed by
157   * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, Configuration, org.apache.hadoop.hbase.wal.WALFactory)}
158   * for tests.
159   */
160  public static List<FileStatus> getFileList(final Configuration conf, final List<Path> logDirs,
161    final PathFilter filter) throws IOException {
162    List<FileStatus> fileStatus = new ArrayList<>();
163    for (Path logDir : logDirs) {
164      final FileSystem fs = logDir.getFileSystem(conf);
165      if (!fs.exists(logDir)) {
166        LOG.warn(logDir + " doesn't exist. Nothing to do!");
167        continue;
168      }
169      FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter);
170      if (logfiles == null || logfiles.length == 0) {
171        LOG.info("{} dir is empty, no logs to split.", logDir);
172      } else {
173        Collections.addAll(fileStatus, logfiles);
174      }
175    }
176
177    return fileStatus;
178  }
179
180  /**
181   * @param logDir one region sever wal dir path in .logs
182   * @throws IOException if there was an error while splitting any log file
183   * @return cumulative size of the logfiles split n
184   */
185  public long splitLogDistributed(final Path logDir) throws IOException {
186    List<Path> logDirs = new ArrayList<>();
187    logDirs.add(logDir);
188    return splitLogDistributed(logDirs);
189  }
190
191  /**
192   * The caller will block until all the log files of the given region server have been processed -
193   * successfully split or an error is encountered - by an available worker region server. This
194   * method must only be called after the region servers have been brought online.
195   * @param logDirs List of log dirs to split
196   * @throws IOException If there was an error while splitting any log file
197   * @return cumulative size of the logfiles split
198   */
199  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
200    if (logDirs.isEmpty()) {
201      return 0;
202    }
203    Set<ServerName> serverNames = new HashSet<>();
204    for (Path logDir : logDirs) {
205      try {
206        ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
207        if (serverName != null) {
208          serverNames.add(serverName);
209        }
210      } catch (IllegalArgumentException e) {
211        // ignore invalid format error.
212        LOG.warn("Cannot parse server name from " + logDir);
213      }
214    }
215    return splitLogDistributed(serverNames, logDirs, null);
216  }
217
218  /**
219   * The caller will block until all the hbase:meta log files of the given region server have been
220   * processed - successfully split or an error is encountered - by an available worker region
221   * server. This method must only be called after the region servers have been brought online.
222   * @param logDirs List of log dirs to split
223   * @param filter  the Path filter to select specific files for considering
224   * @throws IOException If there was an error while splitting any log file
225   * @return cumulative size of the logfiles split
226   */
227  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
228    PathFilter filter) throws IOException {
229    MonitoredTask status = TaskMonitor.get()
230      .createStatus("Doing distributed log split in " + logDirs + " for serverName=" + serverNames);
231    long totalSize = 0;
232    TaskBatch batch = null;
233    long startTime = 0;
234    List<FileStatus> logfiles = getFileList(logDirs, filter);
235    if (!logfiles.isEmpty()) {
236      status.setStatus("Checking directory contents...");
237      SplitLogCounters.tot_mgr_log_split_batch_start.increment();
238      LOG.info(
239        "Started splitting " + logfiles.size() + " logs in " + logDirs + " for " + serverNames);
240      startTime = EnvironmentEdgeManager.currentTime();
241      batch = new TaskBatch();
242      for (FileStatus lf : logfiles) {
243        // TODO If the log file is still being written to - which is most likely
244        // the case for the last log file - then its length will show up here
245        // as zero. The size of such a file can only be retrieved after
246        // recover-lease is done. totalSize will be under in most cases and the
247        // metrics that it drives will also be under-reported.
248        totalSize += lf.getLen();
249        String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf);
250        if (!enqueueSplitTask(pathToLog, batch)) {
251          throw new IOException("duplicate log split scheduled for " + lf.getPath());
252        }
253      }
254      waitForSplittingCompletion(batch, status);
255    }
256
257    if (batch != null && batch.done != batch.installed) {
258      batch.isDead = true;
259      SplitLogCounters.tot_mgr_log_split_batch_err.increment();
260      LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
261        + " but only " + batch.done + " done");
262      String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
263      status.abort(msg);
264      throw new IOException(msg);
265    }
266    for (Path logDir : logDirs) {
267      status.setStatus("Cleaning up log directory...");
268      final FileSystem fs = logDir.getFileSystem(conf);
269      try {
270        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
271          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
272        }
273      } catch (IOException ioe) {
274        FileStatus[] files = fs.listStatus(logDir);
275        if (files != null && files.length > 0) {
276          LOG.warn(
277            "Returning success without actually splitting and "
278              + "deleting all the log files in path " + logDir + ": " + Arrays.toString(files),
279            ioe);
280        } else {
281          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
282        }
283      }
284      SplitLogCounters.tot_mgr_log_split_batch_success.increment();
285    }
286    String msg = "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize)
287      + " (" + totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed)
288      + " log files in " + logDirs + " in "
289      + ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) + "ms";
290    status.markComplete(msg);
291    LOG.info(msg);
292    return totalSize;
293  }
294
295  /**
296   * Add a task entry to coordination if it is not already there.
297   * @param taskname the path of the log to be split
298   * @param batch    the batch this task belongs to
299   * @return true if a new entry is created, false if it is already there.
300   */
301  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
302    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
303    String task = getSplitLogManagerCoordination().prepareTask(taskname);
304    Task oldtask = createTaskIfAbsent(task, batch);
305    if (oldtask == null) {
306      // publish the task in the coordination engine
307      getSplitLogManagerCoordination().submitTask(task);
308      return true;
309    }
310    return false;
311  }
312
313  /**
314   * Get the amount of time in milliseconds to wait till next check. Check less frequently if a
315   * bunch of work to do still. At a max, check every minute. At a minimum, check every 100ms. This
316   * is to alleviate case where perhaps there are a bunch of threads waiting on a completion. For
317   * example, if the zk-based implementation, we will scan the '/hbase/splitWAL' dir every time
318   * through this loop. If there are lots of WALs to split -- could be tens of thousands if big
319   * cluster -- then it will take a while. If the Master has many SCPs waiting on wal splitting --
320   * could be up to 10 x the configured PE thread count (default would be 160) -- then the Master
321   * will be putting up a bunch of load on zk.
322   */
323  static int getBatchWaitTimeMillis(int remainingTasks) {
324    return remainingTasks < 10 ? 100 : remainingTasks < 100 ? 1000 : 60_000;
325  }
326
327  private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
328    synchronized (batch) {
329      while ((batch.done + batch.error) != batch.installed) {
330        try {
331          status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
332            + batch.installed + " done=" + batch.done + " error=" + batch.error);
333          int remaining = batch.installed - (batch.done + batch.error);
334          int actual = activeTasks(batch);
335          if (remaining != actual) {
336            LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
337          }
338          int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
339          if (remainingTasks >= 0 && actual > remainingTasks) {
340            LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
341              + remainingTasks);
342          }
343          if (remainingTasks == 0 || actual == 0) {
344            LOG.warn(
345              "No more task remaining, splitting " + "should have completed. Remaining tasks is "
346                + remainingTasks + ", active tasks in map " + actual);
347            if (remainingTasks == 0 && actual == 0) {
348              return;
349            }
350          }
351          batch.wait(getBatchWaitTimeMillis(remainingTasks));
352          if (server.isStopped()) {
353            LOG.warn("Stopped while waiting for log splits to be completed");
354            return;
355          }
356        } catch (InterruptedException e) {
357          LOG.warn("Interrupted while waiting for log splits to be completed");
358          Thread.currentThread().interrupt();
359          return;
360        }
361      }
362    }
363  }
364
365  ConcurrentMap<String, Task> getTasks() {
366    return tasks;
367  }
368
369  private int activeTasks(final TaskBatch batch) {
370    int count = 0;
371    for (Task t : tasks.values()) {
372      if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
373        count++;
374      }
375    }
376    return count;
377
378  }
379
380  /**
381   * nn * @return null on success, existing task on error
382   */
383  private Task createTaskIfAbsent(String path, TaskBatch batch) {
384    Task oldtask;
385    // batch.installed is only changed via this function and
386    // a single thread touches batch.installed.
387    Task newtask = new Task();
388    newtask.batch = batch;
389    oldtask = tasks.putIfAbsent(path, newtask);
390    if (oldtask == null) {
391      batch.installed++;
392      return null;
393    }
394    // new task was not used.
395    synchronized (oldtask) {
396      if (oldtask.isOrphan()) {
397        if (oldtask.status == SUCCESS) {
398          // The task is already done. Do not install the batch for this
399          // task because it might be too late for setDone() to update
400          // batch.done. There is no need for the batch creator to wait for
401          // this task to complete.
402          return (null);
403        }
404        if (oldtask.status == IN_PROGRESS) {
405          oldtask.batch = batch;
406          batch.installed++;
407          LOG.debug("Previously orphan task " + path + " is now being waited upon");
408          return null;
409        }
410        while (oldtask.status == FAILURE) {
411          LOG.debug("wait for status of task " + path + " to change to DELETED");
412          SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
413          try {
414            oldtask.wait();
415          } catch (InterruptedException e) {
416            Thread.currentThread().interrupt();
417            LOG.warn("Interrupted when waiting for znode delete callback");
418            // fall through to return failure
419            break;
420          }
421        }
422        if (oldtask.status != DELETED) {
423          LOG.warn("Failure because previously failed task"
424            + " state still present. Waiting for znode delete callback" + " path=" + path);
425          return oldtask;
426        }
427        // reinsert the newTask and it must succeed this time
428        Task t = tasks.putIfAbsent(path, newtask);
429        if (t == null) {
430          batch.installed++;
431          return null;
432        }
433        LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
434        assert false : "Deleted task still present in tasks map";
435        return t;
436      }
437      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
438      return oldtask;
439    }
440  }
441
442  public void stop() {
443    if (choreService != null) {
444      choreService.shutdown();
445    }
446    if (timeoutMonitor != null) {
447      timeoutMonitor.shutdown(true);
448    }
449  }
450
451  void handleDeadWorker(ServerName workerName) {
452    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
453    // to reason about concurrency. Makes it easier to retry.
454    synchronized (deadWorkersLock) {
455      if (deadWorkers == null) {
456        deadWorkers = new HashSet<>(100);
457      }
458      deadWorkers.add(workerName);
459    }
460    LOG.info("Dead splitlog worker {}", workerName);
461  }
462
463  void handleDeadWorkers(Set<ServerName> serverNames) {
464    synchronized (deadWorkersLock) {
465      if (deadWorkers == null) {
466        deadWorkers = new HashSet<>(100);
467      }
468      deadWorkers.addAll(serverNames);
469    }
470    LOG.info("dead splitlog workers " + serverNames);
471  }
472
473  /**
474   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
475   * Clients threads use this object to wait for all their tasks to be done.
476   * <p>
477   * All access is synchronized.
478   */
479  @InterfaceAudience.Private
480  public static class TaskBatch {
481    public int installed = 0;
482    public int done = 0;
483    public int error = 0;
484    public volatile boolean isDead = false;
485
486    @Override
487    public String toString() {
488      return ("installed = " + installed + " done = " + done + " error = " + error);
489    }
490  }
491
492  /**
493   * in memory state of an active task.
494   */
495  @InterfaceAudience.Private
496  public static class Task {
497    public volatile long last_update;
498    public volatile int last_version;
499    public volatile ServerName cur_worker_name;
500    public volatile TaskBatch batch;
501    public volatile TerminationStatus status;
502    public volatile AtomicInteger incarnation = new AtomicInteger(0);
503    public final AtomicInteger unforcedResubmits = new AtomicInteger();
504    public volatile boolean resubmitThresholdReached;
505
506    @Override
507    public String toString() {
508      return ("last_update = " + last_update + " last_version = " + last_version
509        + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
510        + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
511    }
512
513    public Task() {
514      last_version = -1;
515      status = IN_PROGRESS;
516      setUnassigned();
517    }
518
519    public boolean isOrphan() {
520      return (batch == null || batch.isDead);
521    }
522
523    public boolean isUnassigned() {
524      return (cur_worker_name == null);
525    }
526
527    public void heartbeatNoDetails(long time) {
528      last_update = time;
529    }
530
531    public void heartbeat(long time, int version, ServerName worker) {
532      last_version = version;
533      last_update = time;
534      cur_worker_name = worker;
535    }
536
537    public void setUnassigned() {
538      cur_worker_name = null;
539      last_update = -1;
540    }
541  }
542
543  /**
544   * Periodically checks all active tasks and resubmits the ones that have timed out
545   */
546  private class TimeoutMonitor extends ScheduledChore {
547    private long lastLog = 0;
548
549    public TimeoutMonitor(final int period, Stoppable stopper) {
550      super("SplitLogManager Timeout Monitor", stopper, period);
551    }
552
553    @Override
554    protected void chore() {
555      if (server.getCoordinatedStateManager() == null) {
556        return;
557      }
558
559      int resubmitted = 0;
560      int unassigned = 0;
561      int tot = 0;
562      boolean found_assigned_task = false;
563      Set<ServerName> localDeadWorkers;
564
565      synchronized (deadWorkersLock) {
566        localDeadWorkers = deadWorkers;
567        deadWorkers = null;
568      }
569
570      for (Map.Entry<String, Task> e : tasks.entrySet()) {
571        String path = e.getKey();
572        Task task = e.getValue();
573        ServerName cur_worker = task.cur_worker_name;
574        tot++;
575        // don't easily resubmit a task which hasn't been picked up yet. It
576        // might be a long while before a SplitLogWorker is free to pick up a
577        // task. This is because a SplitLogWorker picks up a task one at a
578        // time. If we want progress when there are no region servers then we
579        // will have to run a SplitLogWorker thread in the Master.
580        if (task.isUnassigned()) {
581          unassigned++;
582          continue;
583        }
584        found_assigned_task = true;
585        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
586          SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
587          if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
588            resubmitted++;
589          } else {
590            handleDeadWorker(cur_worker);
591            LOG.warn(
592              "Failed to resubmit task " + path + " owned by dead " + cur_worker + ", will retry.");
593          }
594        } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
595          resubmitted++;
596        }
597      }
598      if (tot > 0) {
599        long now = EnvironmentEdgeManager.currentTime();
600        if (now > lastLog + 5000) {
601          lastLog = now;
602          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
603        }
604      }
605      if (resubmitted > 0) {
606        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
607      }
608      // If there are pending tasks and all of them have been unassigned for
609      // some time then put up a RESCAN node to ping the workers.
610      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
611      // because a. it is very unlikely that every worker had a
612      // transient error when trying to grab the task b. if there are no
613      // workers then all tasks wills stay unassigned indefinitely and the
614      // manager will be indefinitely creating RESCAN nodes. TODO may be the
615      // master should spawn both a manager and a worker thread to guarantee
616      // that there is always one worker in the system
617      if (
618        tot > 0 && !found_assigned_task
619          && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)
620      ) {
621        for (Map.Entry<String, Task> e : tasks.entrySet()) {
622          String key = e.getKey();
623          Task task = e.getValue();
624          // we have to do task.isUnassigned() check again because tasks might
625          // have been asynchronously assigned. There is no locking required
626          // for these checks ... it is OK even if tryGetDataSetWatch() is
627          // called unnecessarily for a taskpath
628          if (task.isUnassigned() && (task.status != FAILURE)) {
629            // We just touch the znode to make sure its still there
630            getSplitLogManagerCoordination().checkTaskStillAvailable(key);
631          }
632        }
633        getSplitLogManagerCoordination().checkTasks();
634        SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
635        LOG.debug("resubmitting unassigned task(s) after timeout");
636      }
637      Set<String> failedDeletions =
638        getSplitLogManagerCoordination().getDetails().getFailedDeletions();
639      // Retry previously failed deletes
640      if (failedDeletions.size() > 0) {
641        List<String> tmpPaths = new ArrayList<>(failedDeletions);
642        for (String tmpPath : tmpPaths) {
643          // deleteNode is an async call
644          getSplitLogManagerCoordination().deleteTask(tmpPath);
645        }
646        failedDeletions.removeAll(tmpPaths);
647      }
648    }
649  }
650
651  public enum ResubmitDirective {
652    CHECK(),
653    FORCE()
654  }
655
656  public enum TerminationStatus {
657    IN_PROGRESS("in_progress"),
658    SUCCESS("success"),
659    FAILURE("failure"),
660    DELETED("deleted");
661
662    final String statusMsg;
663
664    TerminationStatus(String msg) {
665      statusMsg = msg;
666    }
667
668    @Override
669    public String toString() {
670      return statusMsg;
671    }
672  }
673}