001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
021import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.fs.PathFilter;
041import org.apache.hadoop.hbase.ChoreService;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.SplitLogCounters;
045import org.apache.hadoop.hbase.Stoppable;
046import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
047import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.monitoring.MonitoredTask;
050import org.apache.hadoop.hbase.monitoring.TaskMonitor;
051import org.apache.hadoop.hbase.procedure2.util.StringUtils;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059
060/**
061 * Distributes the task of log splitting to the available region servers.
062 * Coordination happens via coordination engine. For every log file that has to be split a
063 * task is created. SplitLogWorkers race to grab a task.
064 *
065 * <p>SplitLogManager monitors the tasks that it creates using the
066 * timeoutMonitor thread. If a task's progress is slow then
067 * {@link SplitLogManagerCoordination#checkTasks} will take away the
068 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
069 * and the task will be up for grabs again. When the task is done then it is
070 * deleted by SplitLogManager.
071 *
072 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
073 * log files. The caller thread waits in this method until all the log files
074 * have been split.
075 *
076 * <p>All the coordination calls made by this class are asynchronous. This is mainly
077 * to help reduce response time seen by the callers.
078 *
079 * <p>There is race in this design between the SplitLogManager and the
080 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
081 * already been completed by a SplitLogWorker. We rely on the idempotency of
082 * the log splitting task for correctness.
083 *
084 * <p>It is also assumed that every log splitting task is unique and once
085 * completed (either with success or with error) it will be not be submitted
086 * again. If a task is resubmitted then there is a risk that old "delete task"
087 * can delete the re-submission.
088 * @see SplitWALManager for an alternate implementation based on Procedures.
089 * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
090 *   distributed WAL splitter, see SplitWALManager.
091 */
092@Deprecated
093@InterfaceAudience.Private
094public class SplitLogManager {
095  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
096
097  private final MasterServices server;
098
099  private final Configuration conf;
100  private final ChoreService choreService;
101
102  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
103
104  private long unassignedTimeout;
105  private long lastTaskCreateTime = Long.MAX_VALUE;
106
107  @VisibleForTesting
108  final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
109  private TimeoutMonitor timeoutMonitor;
110
111  private volatile Set<ServerName> deadWorkers = null;
112  private final Object deadWorkersLock = new Object();
113
114  /**
115   * Its OK to construct this object even when region-servers are not online. It does lookup the
116   * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
117   * @param master the master services
118   * @param conf the HBase configuration
119   * @throws IOException
120   */
121  public SplitLogManager(MasterServices master, Configuration conf)
122      throws IOException {
123    this.server = master;
124    this.conf = conf;
125    // If no CoordinatedStateManager, skip registering as a chore service (The
126    // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL
127    // splitting. It is null if we are configured to use procedure-based distributed WAL
128    // splitting.
129    if (server.getCoordinatedStateManager() != null) {
130      this.choreService =
131        new ChoreService(master.getServerName().toShortString() + ".splitLogManager.");
132      SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
133      Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
134      SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
135      coordination.setDetails(details);
136      coordination.init();
137      this.unassignedTimeout =
138        conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
139      this.timeoutMonitor =
140        new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
141          master);
142      this.choreService.scheduleChore(timeoutMonitor);
143    } else {
144      this.choreService = null;
145      this.timeoutMonitor = null;
146    }
147  }
148
149  private SplitLogManagerCoordination getSplitLogManagerCoordination() {
150    return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
151  }
152
153  private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
154    return getFileList(conf, logDirs, filter);
155  }
156
157  /**
158   * Get a list of paths that need to be split given a set of server-specific directories and
159   * optionally  a filter.
160   *
161   * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
162   * layout.
163   *
164   * Should be package-private, but is needed by
165   * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
166   *     Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
167   */
168  @VisibleForTesting
169  public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
170      final PathFilter filter)
171      throws IOException {
172    List<FileStatus> fileStatus = new ArrayList<>();
173    for (Path logDir : logDirs) {
174      final FileSystem fs = logDir.getFileSystem(conf);
175      if (!fs.exists(logDir)) {
176        LOG.warn(logDir + " doesn't exist. Nothing to do!");
177        continue;
178      }
179      FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter);
180      if (logfiles == null || logfiles.length == 0) {
181        LOG.info("{} dir is empty, no logs to split.", logDir);
182      } else {
183        Collections.addAll(fileStatus, logfiles);
184      }
185    }
186    FileStatus[] a = new FileStatus[fileStatus.size()];
187    return fileStatus.toArray(a);
188  }
189
190  /**
191   * @param logDir one region sever wal dir path in .logs
192   * @throws IOException if there was an error while splitting any log file
193   * @return cumulative size of the logfiles split
194   * @throws IOException
195   */
196  public long splitLogDistributed(final Path logDir) throws IOException {
197    List<Path> logDirs = new ArrayList<>();
198    logDirs.add(logDir);
199    return splitLogDistributed(logDirs);
200  }
201
202  /**
203   * The caller will block until all the log files of the given region server have been processed -
204   * successfully split or an error is encountered - by an available worker region server. This
205   * method must only be called after the region servers have been brought online.
206   * @param logDirs List of log dirs to split
207   * @throws IOException If there was an error while splitting any log file
208   * @return cumulative size of the logfiles split
209   */
210  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
211    if (logDirs.isEmpty()) {
212      return 0;
213    }
214    Set<ServerName> serverNames = new HashSet<>();
215    for (Path logDir : logDirs) {
216      try {
217        ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
218        if (serverName != null) {
219          serverNames.add(serverName);
220        }
221      } catch (IllegalArgumentException e) {
222        // ignore invalid format error.
223        LOG.warn("Cannot parse server name from " + logDir);
224      }
225    }
226    return splitLogDistributed(serverNames, logDirs, null);
227  }
228
229  /**
230   * The caller will block until all the hbase:meta log files of the given region server have been
231   * processed - successfully split or an error is encountered - by an available worker region
232   * server. This method must only be called after the region servers have been brought online.
233   * @param logDirs List of log dirs to split
234   * @param filter the Path filter to select specific files for considering
235   * @throws IOException If there was an error while splitting any log file
236   * @return cumulative size of the logfiles split
237   */
238  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
239      PathFilter filter) throws IOException {
240    MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
241      logDirs + " for serverName=" + serverNames);
242    long totalSize = 0;
243    TaskBatch batch = null;
244    long startTime = 0;
245    FileStatus[] logfiles = getFileList(logDirs, filter);
246    if (logfiles.length != 0) {
247      status.setStatus("Checking directory contents...");
248      SplitLogCounters.tot_mgr_log_split_batch_start.increment();
249      LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
250          " for " + serverNames);
251      startTime = EnvironmentEdgeManager.currentTime();
252      batch = new TaskBatch();
253      for (FileStatus lf : logfiles) {
254        // TODO If the log file is still being written to - which is most likely
255        // the case for the last log file - then its length will show up here
256        // as zero. The size of such a file can only be retrieved after
257        // recover-lease is done. totalSize will be under in most cases and the
258        // metrics that it drives will also be under-reported.
259        totalSize += lf.getLen();
260        String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf);
261        if (!enqueueSplitTask(pathToLog, batch)) {
262          throw new IOException("duplicate log split scheduled for " + lf.getPath());
263        }
264      }
265      waitForSplittingCompletion(batch, status);
266    }
267
268    if (batch != null && batch.done != batch.installed) {
269      batch.isDead = true;
270      SplitLogCounters.tot_mgr_log_split_batch_err.increment();
271      LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
272          + " but only " + batch.done + " done");
273      String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
274      status.abort(msg);
275      throw new IOException(msg);
276    }
277    for (Path logDir : logDirs) {
278      status.setStatus("Cleaning up log directory...");
279      final FileSystem fs = logDir.getFileSystem(conf);
280      try {
281        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
282          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
283        }
284      } catch (IOException ioe) {
285        FileStatus[] files = fs.listStatus(logDir);
286        if (files != null && files.length > 0) {
287          LOG.warn("Returning success without actually splitting and "
288              + "deleting all the log files in path " + logDir + ": "
289              + Arrays.toString(files), ioe);
290        } else {
291          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
292        }
293      }
294      SplitLogCounters.tot_mgr_log_split_batch_success.increment();
295    }
296    String msg =
297        "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize) + " (" +
298            totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed) + " log files in " +
299            logDirs + " in " +
300            ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) +
301            "ms";
302    status.markComplete(msg);
303    LOG.info(msg);
304    return totalSize;
305  }
306
307  /**
308   * Add a task entry to coordination if it is not already there.
309   * @param taskname the path of the log to be split
310   * @param batch the batch this task belongs to
311   * @return true if a new entry is created, false if it is already there.
312   */
313  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
314    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
315    String task = getSplitLogManagerCoordination().prepareTask(taskname);
316    Task oldtask = createTaskIfAbsent(task, batch);
317    if (oldtask == null) {
318      // publish the task in the coordination engine
319      getSplitLogManagerCoordination().submitTask(task);
320      return true;
321    }
322    return false;
323  }
324
325  /**
326   * Get the amount of time in milliseconds to wait till next check.
327   * Check less frequently if a bunch of work to do still. At a max, check every minute.
328   * At a minimum, check every 100ms. This is to alleviate case where perhaps there are a bunch of
329   * threads waiting on a completion. For example, if the zk-based implementation, we will scan the
330   * '/hbase/splitWAL' dir every time through this loop. If there are lots of WALs to
331   * split -- could be tens of thousands if big cluster -- then it will take a while. If
332   * the Master has many SCPs waiting on wal splitting -- could be up to 10 x the configured
333   * PE thread count (default would be 160) -- then the Master will be putting up a bunch of
334   * load on zk.
335   */
336  static int getBatchWaitTimeMillis(int remainingTasks) {
337    return remainingTasks < 10? 100: remainingTasks < 100? 1000: 60_000;
338  }
339
340  private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
341    synchronized (batch) {
342      while ((batch.done + batch.error) != batch.installed) {
343        try {
344          status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
345              + batch.installed + " done=" + batch.done + " error=" + batch.error);
346          int remaining = batch.installed - (batch.done + batch.error);
347          int actual = activeTasks(batch);
348          if (remaining != actual) {
349            LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
350          }
351          int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
352          if (remainingTasks >= 0 && actual > remainingTasks) {
353            LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
354                + remainingTasks);
355          }
356          if (remainingTasks == 0 || actual == 0) {
357            LOG.warn("No more task remaining, splitting "
358                + "should have completed. Remaining tasks is " + remainingTasks
359                + ", active tasks in map " + actual);
360            if (remainingTasks == 0 && actual == 0) {
361              return;
362            }
363          }
364          batch.wait(getBatchWaitTimeMillis(remainingTasks));
365          if (server.isStopped()) {
366            LOG.warn("Stopped while waiting for log splits to be completed");
367            return;
368          }
369        } catch (InterruptedException e) {
370          LOG.warn("Interrupted while waiting for log splits to be completed");
371          Thread.currentThread().interrupt();
372          return;
373        }
374      }
375    }
376  }
377
378  @VisibleForTesting
379  ConcurrentMap<String, Task> getTasks() {
380    return tasks;
381  }
382
383  private int activeTasks(final TaskBatch batch) {
384    int count = 0;
385    for (Task t : tasks.values()) {
386      if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
387        count++;
388      }
389    }
390    return count;
391
392  }
393
394  /**
395   * @param path
396   * @param batch
397   * @return null on success, existing task on error
398   */
399  private Task createTaskIfAbsent(String path, TaskBatch batch) {
400    Task oldtask;
401    // batch.installed is only changed via this function and
402    // a single thread touches batch.installed.
403    Task newtask = new Task();
404    newtask.batch = batch;
405    oldtask = tasks.putIfAbsent(path, newtask);
406    if (oldtask == null) {
407      batch.installed++;
408      return null;
409    }
410    // new task was not used.
411    synchronized (oldtask) {
412      if (oldtask.isOrphan()) {
413        if (oldtask.status == SUCCESS) {
414          // The task is already done. Do not install the batch for this
415          // task because it might be too late for setDone() to update
416          // batch.done. There is no need for the batch creator to wait for
417          // this task to complete.
418          return (null);
419        }
420        if (oldtask.status == IN_PROGRESS) {
421          oldtask.batch = batch;
422          batch.installed++;
423          LOG.debug("Previously orphan task " + path + " is now being waited upon");
424          return null;
425        }
426        while (oldtask.status == FAILURE) {
427          LOG.debug("wait for status of task " + path + " to change to DELETED");
428          SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
429          try {
430            oldtask.wait();
431          } catch (InterruptedException e) {
432            Thread.currentThread().interrupt();
433            LOG.warn("Interrupted when waiting for znode delete callback");
434            // fall through to return failure
435            break;
436          }
437        }
438        if (oldtask.status != DELETED) {
439          LOG.warn("Failure because previously failed task"
440              + " state still present. Waiting for znode delete callback" + " path=" + path);
441          return oldtask;
442        }
443        // reinsert the newTask and it must succeed this time
444        Task t = tasks.putIfAbsent(path, newtask);
445        if (t == null) {
446          batch.installed++;
447          return null;
448        }
449        LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
450        assert false : "Deleted task still present in tasks map";
451        return t;
452      }
453      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
454      return oldtask;
455    }
456  }
457
458  public void stop() {
459    if (choreService != null) {
460      choreService.shutdown();
461    }
462    if (timeoutMonitor != null) {
463      timeoutMonitor.cancel(true);
464    }
465  }
466
467  void handleDeadWorker(ServerName workerName) {
468    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
469    // to reason about concurrency. Makes it easier to retry.
470    synchronized (deadWorkersLock) {
471      if (deadWorkers == null) {
472        deadWorkers = new HashSet<>(100);
473      }
474      deadWorkers.add(workerName);
475    }
476    LOG.info("Dead splitlog worker {}", workerName);
477  }
478
479  void handleDeadWorkers(Set<ServerName> serverNames) {
480    synchronized (deadWorkersLock) {
481      if (deadWorkers == null) {
482        deadWorkers = new HashSet<>(100);
483      }
484      deadWorkers.addAll(serverNames);
485    }
486    LOG.info("dead splitlog workers " + serverNames);
487  }
488
489  /**
490   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
491   * Clients threads use this object to wait for all their tasks to be done.
492   * <p>
493   * All access is synchronized.
494   */
495  @InterfaceAudience.Private
496  public static class TaskBatch {
497    public int installed = 0;
498    public int done = 0;
499    public int error = 0;
500    public volatile boolean isDead = false;
501
502    @Override
503    public String toString() {
504      return ("installed = " + installed + " done = " + done + " error = " + error);
505    }
506  }
507
508  /**
509   * in memory state of an active task.
510   */
511  @InterfaceAudience.Private
512  public static class Task {
513    public volatile long last_update;
514    public volatile int last_version;
515    public volatile ServerName cur_worker_name;
516    public volatile TaskBatch batch;
517    public volatile TerminationStatus status;
518    public volatile AtomicInteger incarnation = new AtomicInteger(0);
519    public final AtomicInteger unforcedResubmits = new AtomicInteger();
520    public volatile boolean resubmitThresholdReached;
521
522    @Override
523    public String toString() {
524      return ("last_update = " + last_update + " last_version = " + last_version
525          + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
526          + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
527    }
528
529    public Task() {
530      last_version = -1;
531      status = IN_PROGRESS;
532      setUnassigned();
533    }
534
535    public boolean isOrphan() {
536      return (batch == null || batch.isDead);
537    }
538
539    public boolean isUnassigned() {
540      return (cur_worker_name == null);
541    }
542
543    public void heartbeatNoDetails(long time) {
544      last_update = time;
545    }
546
547    public void heartbeat(long time, int version, ServerName worker) {
548      last_version = version;
549      last_update = time;
550      cur_worker_name = worker;
551    }
552
553    public void setUnassigned() {
554      cur_worker_name = null;
555      last_update = -1;
556    }
557  }
558
559  /**
560   * Periodically checks all active tasks and resubmits the ones that have timed out
561   */
562  private class TimeoutMonitor extends ScheduledChore {
563    private long lastLog = 0;
564
565    public TimeoutMonitor(final int period, Stoppable stopper) {
566      super("SplitLogManager Timeout Monitor", stopper, period);
567    }
568
569    @Override
570    protected void chore() {
571      if (server.getCoordinatedStateManager() == null) {
572        return;
573      }
574
575      int resubmitted = 0;
576      int unassigned = 0;
577      int tot = 0;
578      boolean found_assigned_task = false;
579      Set<ServerName> localDeadWorkers;
580
581      synchronized (deadWorkersLock) {
582        localDeadWorkers = deadWorkers;
583        deadWorkers = null;
584      }
585
586      for (Map.Entry<String, Task> e : tasks.entrySet()) {
587        String path = e.getKey();
588        Task task = e.getValue();
589        ServerName cur_worker = task.cur_worker_name;
590        tot++;
591        // don't easily resubmit a task which hasn't been picked up yet. It
592        // might be a long while before a SplitLogWorker is free to pick up a
593        // task. This is because a SplitLogWorker picks up a task one at a
594        // time. If we want progress when there are no region servers then we
595        // will have to run a SplitLogWorker thread in the Master.
596        if (task.isUnassigned()) {
597          unassigned++;
598          continue;
599        }
600        found_assigned_task = true;
601        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
602          SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
603          if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
604            resubmitted++;
605          } else {
606            handleDeadWorker(cur_worker);
607            LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
608                + ", will retry.");
609          }
610        } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
611          resubmitted++;
612        }
613      }
614      if (tot > 0) {
615        long now = EnvironmentEdgeManager.currentTime();
616        if (now > lastLog + 5000) {
617          lastLog = now;
618          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
619        }
620      }
621      if (resubmitted > 0) {
622        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
623      }
624      // If there are pending tasks and all of them have been unassigned for
625      // some time then put up a RESCAN node to ping the workers.
626      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
627      // because a. it is very unlikely that every worker had a
628      // transient error when trying to grab the task b. if there are no
629      // workers then all tasks wills stay unassigned indefinitely and the
630      // manager will be indefinitely creating RESCAN nodes. TODO may be the
631      // master should spawn both a manager and a worker thread to guarantee
632      // that there is always one worker in the system
633      if (tot > 0
634          && !found_assigned_task
635          && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
636        for (Map.Entry<String, Task> e : tasks.entrySet()) {
637          String key = e.getKey();
638          Task task = e.getValue();
639          // we have to do task.isUnassigned() check again because tasks might
640          // have been asynchronously assigned. There is no locking required
641          // for these checks ... it is OK even if tryGetDataSetWatch() is
642          // called unnecessarily for a taskpath
643          if (task.isUnassigned() && (task.status != FAILURE)) {
644            // We just touch the znode to make sure its still there
645            getSplitLogManagerCoordination().checkTaskStillAvailable(key);
646          }
647        }
648        getSplitLogManagerCoordination().checkTasks();
649        SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
650        LOG.debug("resubmitting unassigned task(s) after timeout");
651      }
652      Set<String> failedDeletions =
653        getSplitLogManagerCoordination().getDetails().getFailedDeletions();
654      // Retry previously failed deletes
655      if (failedDeletions.size() > 0) {
656        List<String> tmpPaths = new ArrayList<>(failedDeletions);
657        for (String tmpPath : tmpPaths) {
658          // deleteNode is an async call
659          getSplitLogManagerCoordination().deleteTask(tmpPath);
660        }
661        failedDeletions.removeAll(tmpPaths);
662      }
663    }
664  }
665
666  public enum ResubmitDirective {
667    CHECK(), FORCE()
668  }
669
670  public enum TerminationStatus {
671    IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
672
673    final String statusMsg;
674
675    TerminationStatus(String msg) {
676      statusMsg = msg;
677    }
678
679    @Override
680    public String toString() {
681      return statusMsg;
682    }
683  }
684}