001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
021import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.fs.PathFilter;
041import org.apache.hadoop.hbase.ChoreService;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.SplitLogCounters;
045import org.apache.hadoop.hbase.Stoppable;
046import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
047import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.monitoring.MonitoredTask;
050import org.apache.hadoop.hbase.monitoring.TaskMonitor;
051import org.apache.hadoop.hbase.procedure2.util.StringUtils;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058
059/**
060 * Distributes the task of log splitting to the available region servers.
061 * Coordination happens via coordination engine. For every log file that has to be split a
062 * task is created. SplitLogWorkers race to grab a task.
063 *
064 * <p>SplitLogManager monitors the tasks that it creates using the
065 * timeoutMonitor thread. If a task's progress is slow then
066 * {@link SplitLogManagerCoordination#checkTasks} will take away the
067 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
068 * and the task will be up for grabs again. When the task is done then it is
069 * deleted by SplitLogManager.
070 *
071 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
072 * log files. The caller thread waits in this method until all the log files
073 * have been split.
074 *
075 * <p>All the coordination calls made by this class are asynchronous. This is mainly
076 * to help reduce response time seen by the callers.
077 *
078 * <p>There is race in this design between the SplitLogManager and the
079 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
080 * already been completed by a SplitLogWorker. We rely on the idempotency of
081 * the log splitting task for correctness.
082 *
083 * <p>It is also assumed that every log splitting task is unique and once
084 * completed (either with success or with error) it will be not be submitted
085 * again. If a task is resubmitted then there is a risk that old "delete task"
086 * can delete the re-submission.
087 * @see SplitWALManager for an alternate implementation based on Procedures.
088 * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
089 *   distributed WAL splitter, see SplitWALManager.
090 */
091@Deprecated
092@InterfaceAudience.Private
093public class SplitLogManager {
094  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
095
096  private final MasterServices server;
097
098  private final Configuration conf;
099  private final ChoreService choreService;
100
101  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
102
103  private long unassignedTimeout;
104  private long lastTaskCreateTime = Long.MAX_VALUE;
105
106  final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
107  private TimeoutMonitor timeoutMonitor;
108
109  private volatile Set<ServerName> deadWorkers = null;
110  private final Object deadWorkersLock = new Object();
111
112  /**
113   * Its OK to construct this object even when region-servers are not online. It does lookup the
114   * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
115   * @param master the master services
116   * @param conf the HBase configuration
117   * @throws IOException
118   */
119  public SplitLogManager(MasterServices master, Configuration conf)
120      throws IOException {
121    this.server = master;
122    this.conf = conf;
123    // If no CoordinatedStateManager, skip registering as a chore service (The
124    // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL
125    // splitting. It is null if we are configured to use procedure-based distributed WAL
126    // splitting.
127    if (server.getCoordinatedStateManager() != null) {
128      this.choreService =
129        new ChoreService(master.getServerName().toShortString() + ".splitLogManager.");
130      SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
131      Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
132      SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
133      coordination.setDetails(details);
134      coordination.init();
135      this.unassignedTimeout =
136        conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
137      this.timeoutMonitor =
138        new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
139          master);
140      this.choreService.scheduleChore(timeoutMonitor);
141    } else {
142      this.choreService = null;
143      this.timeoutMonitor = null;
144    }
145  }
146
147  private SplitLogManagerCoordination getSplitLogManagerCoordination() {
148    return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
149  }
150
151  private List<FileStatus> getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
152    return getFileList(conf, logDirs, filter);
153  }
154
155  /**
156   * Get a list of paths that need to be split given a set of server-specific directories and
157   * optionally  a filter.
158   *
159   * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
160   * layout.
161   *
162   * Should be package-private, but is needed by
163   * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
164   *     Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
165   */
166  public static List<FileStatus> getFileList(final Configuration conf, final List<Path> logDirs,
167      final PathFilter filter)
168      throws IOException {
169    List<FileStatus> fileStatus = new ArrayList<>();
170    for (Path logDir : logDirs) {
171      final FileSystem fs = logDir.getFileSystem(conf);
172      if (!fs.exists(logDir)) {
173        LOG.warn(logDir + " doesn't exist. Nothing to do!");
174        continue;
175      }
176      FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter);
177      if (logfiles == null || logfiles.length == 0) {
178        LOG.info("{} dir is empty, no logs to split.", logDir);
179      } else {
180        Collections.addAll(fileStatus, logfiles);
181      }
182    }
183
184    return fileStatus;
185  }
186
187  /**
188   * @param logDir one region sever wal dir path in .logs
189   * @throws IOException if there was an error while splitting any log file
190   * @return cumulative size of the logfiles split
191   * @throws IOException
192   */
193  public long splitLogDistributed(final Path logDir) throws IOException {
194    List<Path> logDirs = new ArrayList<>();
195    logDirs.add(logDir);
196    return splitLogDistributed(logDirs);
197  }
198
199  /**
200   * The caller will block until all the log files of the given region server have been processed -
201   * successfully split or an error is encountered - by an available worker region server. This
202   * method must only be called after the region servers have been brought online.
203   * @param logDirs List of log dirs to split
204   * @throws IOException If there was an error while splitting any log file
205   * @return cumulative size of the logfiles split
206   */
207  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
208    if (logDirs.isEmpty()) {
209      return 0;
210    }
211    Set<ServerName> serverNames = new HashSet<>();
212    for (Path logDir : logDirs) {
213      try {
214        ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
215        if (serverName != null) {
216          serverNames.add(serverName);
217        }
218      } catch (IllegalArgumentException e) {
219        // ignore invalid format error.
220        LOG.warn("Cannot parse server name from " + logDir);
221      }
222    }
223    return splitLogDistributed(serverNames, logDirs, null);
224  }
225
226  /**
227   * The caller will block until all the hbase:meta log files of the given region server have been
228   * processed - successfully split or an error is encountered - by an available worker region
229   * server. This method must only be called after the region servers have been brought online.
230   * @param logDirs List of log dirs to split
231   * @param filter the Path filter to select specific files for considering
232   * @throws IOException If there was an error while splitting any log file
233   * @return cumulative size of the logfiles split
234   */
235  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
236      PathFilter filter) throws IOException {
237    MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
238      logDirs + " for serverName=" + serverNames);
239    long totalSize = 0;
240    TaskBatch batch = null;
241    long startTime = 0;
242    List<FileStatus> logfiles = getFileList(logDirs, filter);
243    if (!logfiles.isEmpty()) {
244      status.setStatus("Checking directory contents...");
245      SplitLogCounters.tot_mgr_log_split_batch_start.increment();
246      LOG.info("Started splitting " + logfiles.size() + " logs in " + logDirs +
247          " for " + serverNames);
248      startTime = EnvironmentEdgeManager.currentTime();
249      batch = new TaskBatch();
250      for (FileStatus lf : logfiles) {
251        // TODO If the log file is still being written to - which is most likely
252        // the case for the last log file - then its length will show up here
253        // as zero. The size of such a file can only be retrieved after
254        // recover-lease is done. totalSize will be under in most cases and the
255        // metrics that it drives will also be under-reported.
256        totalSize += lf.getLen();
257        String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf);
258        if (!enqueueSplitTask(pathToLog, batch)) {
259          throw new IOException("duplicate log split scheduled for " + lf.getPath());
260        }
261      }
262      waitForSplittingCompletion(batch, status);
263    }
264
265    if (batch != null && batch.done != batch.installed) {
266      batch.isDead = true;
267      SplitLogCounters.tot_mgr_log_split_batch_err.increment();
268      LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
269          + " but only " + batch.done + " done");
270      String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
271      status.abort(msg);
272      throw new IOException(msg);
273    }
274    for (Path logDir : logDirs) {
275      status.setStatus("Cleaning up log directory...");
276      final FileSystem fs = logDir.getFileSystem(conf);
277      try {
278        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
279          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
280        }
281      } catch (IOException ioe) {
282        FileStatus[] files = fs.listStatus(logDir);
283        if (files != null && files.length > 0) {
284          LOG.warn("Returning success without actually splitting and "
285              + "deleting all the log files in path " + logDir + ": "
286              + Arrays.toString(files), ioe);
287        } else {
288          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
289        }
290      }
291      SplitLogCounters.tot_mgr_log_split_batch_success.increment();
292    }
293    String msg =
294        "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize) + " (" +
295            totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed) + " log files in " +
296            logDirs + " in " +
297            ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) +
298            "ms";
299    status.markComplete(msg);
300    LOG.info(msg);
301    return totalSize;
302  }
303
304  /**
305   * Add a task entry to coordination if it is not already there.
306   * @param taskname the path of the log to be split
307   * @param batch the batch this task belongs to
308   * @return true if a new entry is created, false if it is already there.
309   */
310  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
311    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
312    String task = getSplitLogManagerCoordination().prepareTask(taskname);
313    Task oldtask = createTaskIfAbsent(task, batch);
314    if (oldtask == null) {
315      // publish the task in the coordination engine
316      getSplitLogManagerCoordination().submitTask(task);
317      return true;
318    }
319    return false;
320  }
321
322  /**
323   * Get the amount of time in milliseconds to wait till next check.
324   * Check less frequently if a bunch of work to do still. At a max, check every minute.
325   * At a minimum, check every 100ms. This is to alleviate case where perhaps there are a bunch of
326   * threads waiting on a completion. For example, if the zk-based implementation, we will scan the
327   * '/hbase/splitWAL' dir every time through this loop. If there are lots of WALs to
328   * split -- could be tens of thousands if big cluster -- then it will take a while. If
329   * the Master has many SCPs waiting on wal splitting -- could be up to 10 x the configured
330   * PE thread count (default would be 160) -- then the Master will be putting up a bunch of
331   * load on zk.
332   */
333  static int getBatchWaitTimeMillis(int remainingTasks) {
334    return remainingTasks < 10? 100: remainingTasks < 100? 1000: 60_000;
335  }
336
337  private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
338    synchronized (batch) {
339      while ((batch.done + batch.error) != batch.installed) {
340        try {
341          status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
342              + batch.installed + " done=" + batch.done + " error=" + batch.error);
343          int remaining = batch.installed - (batch.done + batch.error);
344          int actual = activeTasks(batch);
345          if (remaining != actual) {
346            LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
347          }
348          int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
349          if (remainingTasks >= 0 && actual > remainingTasks) {
350            LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
351                + remainingTasks);
352          }
353          if (remainingTasks == 0 || actual == 0) {
354            LOG.warn("No more task remaining, splitting "
355                + "should have completed. Remaining tasks is " + remainingTasks
356                + ", active tasks in map " + actual);
357            if (remainingTasks == 0 && actual == 0) {
358              return;
359            }
360          }
361          batch.wait(getBatchWaitTimeMillis(remainingTasks));
362          if (server.isStopped()) {
363            LOG.warn("Stopped while waiting for log splits to be completed");
364            return;
365          }
366        } catch (InterruptedException e) {
367          LOG.warn("Interrupted while waiting for log splits to be completed");
368          Thread.currentThread().interrupt();
369          return;
370        }
371      }
372    }
373  }
374
375  ConcurrentMap<String, Task> getTasks() {
376    return tasks;
377  }
378
379  private int activeTasks(final TaskBatch batch) {
380    int count = 0;
381    for (Task t : tasks.values()) {
382      if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
383        count++;
384      }
385    }
386    return count;
387
388  }
389
390  /**
391   * @param path
392   * @param batch
393   * @return null on success, existing task on error
394   */
395  private Task createTaskIfAbsent(String path, TaskBatch batch) {
396    Task oldtask;
397    // batch.installed is only changed via this function and
398    // a single thread touches batch.installed.
399    Task newtask = new Task();
400    newtask.batch = batch;
401    oldtask = tasks.putIfAbsent(path, newtask);
402    if (oldtask == null) {
403      batch.installed++;
404      return null;
405    }
406    // new task was not used.
407    synchronized (oldtask) {
408      if (oldtask.isOrphan()) {
409        if (oldtask.status == SUCCESS) {
410          // The task is already done. Do not install the batch for this
411          // task because it might be too late for setDone() to update
412          // batch.done. There is no need for the batch creator to wait for
413          // this task to complete.
414          return (null);
415        }
416        if (oldtask.status == IN_PROGRESS) {
417          oldtask.batch = batch;
418          batch.installed++;
419          LOG.debug("Previously orphan task " + path + " is now being waited upon");
420          return null;
421        }
422        while (oldtask.status == FAILURE) {
423          LOG.debug("wait for status of task " + path + " to change to DELETED");
424          SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
425          try {
426            oldtask.wait();
427          } catch (InterruptedException e) {
428            Thread.currentThread().interrupt();
429            LOG.warn("Interrupted when waiting for znode delete callback");
430            // fall through to return failure
431            break;
432          }
433        }
434        if (oldtask.status != DELETED) {
435          LOG.warn("Failure because previously failed task"
436              + " state still present. Waiting for znode delete callback" + " path=" + path);
437          return oldtask;
438        }
439        // reinsert the newTask and it must succeed this time
440        Task t = tasks.putIfAbsent(path, newtask);
441        if (t == null) {
442          batch.installed++;
443          return null;
444        }
445        LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
446        assert false : "Deleted task still present in tasks map";
447        return t;
448      }
449      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
450      return oldtask;
451    }
452  }
453
454  public void stop() {
455    if (choreService != null) {
456      choreService.shutdown();
457    }
458    if (timeoutMonitor != null) {
459      timeoutMonitor.cancel(true);
460    }
461  }
462
463  void handleDeadWorker(ServerName workerName) {
464    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
465    // to reason about concurrency. Makes it easier to retry.
466    synchronized (deadWorkersLock) {
467      if (deadWorkers == null) {
468        deadWorkers = new HashSet<>(100);
469      }
470      deadWorkers.add(workerName);
471    }
472    LOG.info("Dead splitlog worker {}", workerName);
473  }
474
475  void handleDeadWorkers(Set<ServerName> serverNames) {
476    synchronized (deadWorkersLock) {
477      if (deadWorkers == null) {
478        deadWorkers = new HashSet<>(100);
479      }
480      deadWorkers.addAll(serverNames);
481    }
482    LOG.info("dead splitlog workers " + serverNames);
483  }
484
485  /**
486   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
487   * Clients threads use this object to wait for all their tasks to be done.
488   * <p>
489   * All access is synchronized.
490   */
491  @InterfaceAudience.Private
492  public static class TaskBatch {
493    public int installed = 0;
494    public int done = 0;
495    public int error = 0;
496    public volatile boolean isDead = false;
497
498    @Override
499    public String toString() {
500      return ("installed = " + installed + " done = " + done + " error = " + error);
501    }
502  }
503
504  /**
505   * in memory state of an active task.
506   */
507  @InterfaceAudience.Private
508  public static class Task {
509    public volatile long last_update;
510    public volatile int last_version;
511    public volatile ServerName cur_worker_name;
512    public volatile TaskBatch batch;
513    public volatile TerminationStatus status;
514    public volatile AtomicInteger incarnation = new AtomicInteger(0);
515    public final AtomicInteger unforcedResubmits = new AtomicInteger();
516    public volatile boolean resubmitThresholdReached;
517
518    @Override
519    public String toString() {
520      return ("last_update = " + last_update + " last_version = " + last_version
521          + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
522          + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
523    }
524
525    public Task() {
526      last_version = -1;
527      status = IN_PROGRESS;
528      setUnassigned();
529    }
530
531    public boolean isOrphan() {
532      return (batch == null || batch.isDead);
533    }
534
535    public boolean isUnassigned() {
536      return (cur_worker_name == null);
537    }
538
539    public void heartbeatNoDetails(long time) {
540      last_update = time;
541    }
542
543    public void heartbeat(long time, int version, ServerName worker) {
544      last_version = version;
545      last_update = time;
546      cur_worker_name = worker;
547    }
548
549    public void setUnassigned() {
550      cur_worker_name = null;
551      last_update = -1;
552    }
553  }
554
555  /**
556   * Periodically checks all active tasks and resubmits the ones that have timed out
557   */
558  private class TimeoutMonitor extends ScheduledChore {
559    private long lastLog = 0;
560
561    public TimeoutMonitor(final int period, Stoppable stopper) {
562      super("SplitLogManager Timeout Monitor", stopper, period);
563    }
564
565    @Override
566    protected void chore() {
567      if (server.getCoordinatedStateManager() == null) {
568        return;
569      }
570
571      int resubmitted = 0;
572      int unassigned = 0;
573      int tot = 0;
574      boolean found_assigned_task = false;
575      Set<ServerName> localDeadWorkers;
576
577      synchronized (deadWorkersLock) {
578        localDeadWorkers = deadWorkers;
579        deadWorkers = null;
580      }
581
582      for (Map.Entry<String, Task> e : tasks.entrySet()) {
583        String path = e.getKey();
584        Task task = e.getValue();
585        ServerName cur_worker = task.cur_worker_name;
586        tot++;
587        // don't easily resubmit a task which hasn't been picked up yet. It
588        // might be a long while before a SplitLogWorker is free to pick up a
589        // task. This is because a SplitLogWorker picks up a task one at a
590        // time. If we want progress when there are no region servers then we
591        // will have to run a SplitLogWorker thread in the Master.
592        if (task.isUnassigned()) {
593          unassigned++;
594          continue;
595        }
596        found_assigned_task = true;
597        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
598          SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
599          if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
600            resubmitted++;
601          } else {
602            handleDeadWorker(cur_worker);
603            LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
604                + ", will retry.");
605          }
606        } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
607          resubmitted++;
608        }
609      }
610      if (tot > 0) {
611        long now = EnvironmentEdgeManager.currentTime();
612        if (now > lastLog + 5000) {
613          lastLog = now;
614          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
615        }
616      }
617      if (resubmitted > 0) {
618        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
619      }
620      // If there are pending tasks and all of them have been unassigned for
621      // some time then put up a RESCAN node to ping the workers.
622      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
623      // because a. it is very unlikely that every worker had a
624      // transient error when trying to grab the task b. if there are no
625      // workers then all tasks wills stay unassigned indefinitely and the
626      // manager will be indefinitely creating RESCAN nodes. TODO may be the
627      // master should spawn both a manager and a worker thread to guarantee
628      // that there is always one worker in the system
629      if (tot > 0
630          && !found_assigned_task
631          && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
632        for (Map.Entry<String, Task> e : tasks.entrySet()) {
633          String key = e.getKey();
634          Task task = e.getValue();
635          // we have to do task.isUnassigned() check again because tasks might
636          // have been asynchronously assigned. There is no locking required
637          // for these checks ... it is OK even if tryGetDataSetWatch() is
638          // called unnecessarily for a taskpath
639          if (task.isUnassigned() && (task.status != FAILURE)) {
640            // We just touch the znode to make sure its still there
641            getSplitLogManagerCoordination().checkTaskStillAvailable(key);
642          }
643        }
644        getSplitLogManagerCoordination().checkTasks();
645        SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
646        LOG.debug("resubmitting unassigned task(s) after timeout");
647      }
648      Set<String> failedDeletions =
649        getSplitLogManagerCoordination().getDetails().getFailedDeletions();
650      // Retry previously failed deletes
651      if (failedDeletions.size() > 0) {
652        List<String> tmpPaths = new ArrayList<>(failedDeletions);
653        for (String tmpPath : tmpPaths) {
654          // deleteNode is an async call
655          getSplitLogManagerCoordination().deleteTask(tmpPath);
656        }
657        failedDeletions.removeAll(tmpPaths);
658      }
659    }
660  }
661
662  public enum ResubmitDirective {
663    CHECK(), FORCE()
664  }
665
666  public enum TerminationStatus {
667    IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
668
669    final String statusMsg;
670
671    TerminationStatus(String msg) {
672      statusMsg = msg;
673    }
674
675    @Override
676    public String toString() {
677      return statusMsg;
678    }
679  }
680}