001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
021import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.Collections;
029import java.util.HashSet;
030import java.util.List;
031import java.util.Map;
032import java.util.Set;
033import java.util.concurrent.ConcurrentHashMap;
034import java.util.concurrent.ConcurrentMap;
035import java.util.concurrent.atomic.AtomicInteger;
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.fs.FileStatus;
038import org.apache.hadoop.fs.FileSystem;
039import org.apache.hadoop.fs.Path;
040import org.apache.hadoop.fs.PathFilter;
041import org.apache.hadoop.hbase.ChoreService;
042import org.apache.hadoop.hbase.ScheduledChore;
043import org.apache.hadoop.hbase.ServerName;
044import org.apache.hadoop.hbase.SplitLogCounters;
045import org.apache.hadoop.hbase.Stoppable;
046import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
047import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
048import org.apache.hadoop.hbase.log.HBaseMarkers;
049import org.apache.hadoop.hbase.monitoring.MonitoredTask;
050import org.apache.hadoop.hbase.monitoring.TaskMonitor;
051import org.apache.hadoop.hbase.procedure2.util.StringUtils;
052import org.apache.hadoop.hbase.util.CommonFSUtils;
053import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
054import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
055import org.apache.yetus.audience.InterfaceAudience;
056import org.slf4j.Logger;
057import org.slf4j.LoggerFactory;
058import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
059
060/**
061 * Distributes the task of log splitting to the available region servers.
062 * Coordination happens via coordination engine. For every log file that has to be split a
063 * task is created. SplitLogWorkers race to grab a task.
064 *
065 * <p>SplitLogManager monitors the tasks that it creates using the
066 * timeoutMonitor thread. If a task's progress is slow then
067 * {@link SplitLogManagerCoordination#checkTasks} will take away the
068 * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker}
069 * and the task will be up for grabs again. When the task is done then it is
070 * deleted by SplitLogManager.
071 *
072 * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
073 * log files. The caller thread waits in this method until all the log files
074 * have been split.
075 *
076 * <p>All the coordination calls made by this class are asynchronous. This is mainly
077 * to help reduce response time seen by the callers.
078 *
079 * <p>There is race in this design between the SplitLogManager and the
080 * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
081 * already been completed by a SplitLogWorker. We rely on the idempotency of
082 * the log splitting task for correctness.
083 *
084 * <p>It is also assumed that every log splitting task is unique and once
085 * completed (either with success or with error) it will be not be submitted
086 * again. If a task is resubmitted then there is a risk that old "delete task"
087 * can delete the re-submission.
088 * @see SplitWALManager for an alternate implementation based on Procedures.
089 */
090@InterfaceAudience.Private
091public class SplitLogManager {
092  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
093
094  private final MasterServices server;
095
096  private final Configuration conf;
097  private final ChoreService choreService;
098
099  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
100
101  private long unassignedTimeout;
102  private long lastTaskCreateTime = Long.MAX_VALUE;
103
104  @VisibleForTesting
105  final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
106  private TimeoutMonitor timeoutMonitor;
107
108  private volatile Set<ServerName> deadWorkers = null;
109  private final Object deadWorkersLock = new Object();
110
111  /**
112   * Its OK to construct this object even when region-servers are not online. It does lookup the
113   * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
114   * @param master the master services
115   * @param conf the HBase configuration
116   * @throws IOException
117   */
118  public SplitLogManager(MasterServices master, Configuration conf)
119      throws IOException {
120    this.server = master;
121    this.conf = conf;
122    // If no CoordinatedStateManager, skip registering as a chore service (The
123    // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL
124    // splitting. It is null if we are configured to use procedure-based distributed WAL
125    // splitting.
126    if (server.getCoordinatedStateManager() != null) {
127      this.choreService =
128        new ChoreService(master.getServerName().toShortString() + ".splitLogManager.");
129      SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
130      Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
131      SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
132      coordination.setDetails(details);
133      coordination.init();
134      this.unassignedTimeout =
135        conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
136      this.timeoutMonitor =
137        new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
138          master);
139      this.choreService.scheduleChore(timeoutMonitor);
140    } else {
141      this.choreService = null;
142      this.timeoutMonitor = null;
143    }
144  }
145
146  private SplitLogManagerCoordination getSplitLogManagerCoordination() {
147    return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
148  }
149
150  private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
151    return getFileList(conf, logDirs, filter);
152  }
153
154  /**
155   * Get a list of paths that need to be split given a set of server-specific directories and
156   * optionally  a filter.
157   *
158   * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
159   * layout.
160   *
161   * Should be package-private, but is needed by
162   * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
163   *     Configuration, org.apache.hadoop.hbase.wal.WALFactory)} for tests.
164   */
165  @VisibleForTesting
166  public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
167      final PathFilter filter)
168      throws IOException {
169    List<FileStatus> fileStatus = new ArrayList<>();
170    for (Path logDir : logDirs) {
171      final FileSystem fs = logDir.getFileSystem(conf);
172      if (!fs.exists(logDir)) {
173        LOG.warn(logDir + " doesn't exist. Nothing to do!");
174        continue;
175      }
176      FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter);
177      if (logfiles == null || logfiles.length == 0) {
178        LOG.info("{} dir is empty, no logs to split.", logDir);
179      } else {
180        Collections.addAll(fileStatus, logfiles);
181      }
182    }
183    FileStatus[] a = new FileStatus[fileStatus.size()];
184    return fileStatus.toArray(a);
185  }
186
187  /**
188   * @param logDir one region sever wal dir path in .logs
189   * @throws IOException if there was an error while splitting any log file
190   * @return cumulative size of the logfiles split
191   * @throws IOException
192   */
193  public long splitLogDistributed(final Path logDir) throws IOException {
194    List<Path> logDirs = new ArrayList<>();
195    logDirs.add(logDir);
196    return splitLogDistributed(logDirs);
197  }
198
199  /**
200   * The caller will block until all the log files of the given region server have been processed -
201   * successfully split or an error is encountered - by an available worker region server. This
202   * method must only be called after the region servers have been brought online.
203   * @param logDirs List of log dirs to split
204   * @throws IOException If there was an error while splitting any log file
205   * @return cumulative size of the logfiles split
206   */
207  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
208    if (logDirs.isEmpty()) {
209      return 0;
210    }
211    Set<ServerName> serverNames = new HashSet<>();
212    for (Path logDir : logDirs) {
213      try {
214        ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
215        if (serverName != null) {
216          serverNames.add(serverName);
217        }
218      } catch (IllegalArgumentException e) {
219        // ignore invalid format error.
220        LOG.warn("Cannot parse server name from " + logDir);
221      }
222    }
223    return splitLogDistributed(serverNames, logDirs, null);
224  }
225
226  /**
227   * The caller will block until all the hbase:meta log files of the given region server have been
228   * processed - successfully split or an error is encountered - by an available worker region
229   * server. This method must only be called after the region servers have been brought online.
230   * @param logDirs List of log dirs to split
231   * @param filter the Path filter to select specific files for considering
232   * @throws IOException If there was an error while splitting any log file
233   * @return cumulative size of the logfiles split
234   */
235  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
236      PathFilter filter) throws IOException {
237    MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
238      logDirs + " for serverName=" + serverNames);
239    long totalSize = 0;
240    TaskBatch batch = null;
241    long startTime = 0;
242    FileStatus[] logfiles = getFileList(logDirs, filter);
243    if (logfiles.length != 0) {
244      status.setStatus("Checking directory contents...");
245      SplitLogCounters.tot_mgr_log_split_batch_start.increment();
246      LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
247          " for " + serverNames);
248      startTime = EnvironmentEdgeManager.currentTime();
249      batch = new TaskBatch();
250      for (FileStatus lf : logfiles) {
251        // TODO If the log file is still being written to - which is most likely
252        // the case for the last log file - then its length will show up here
253        // as zero. The size of such a file can only be retrieved after
254        // recover-lease is done. totalSize will be under in most cases and the
255        // metrics that it drives will also be under-reported.
256        totalSize += lf.getLen();
257        String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf);
258        if (!enqueueSplitTask(pathToLog, batch)) {
259          throw new IOException("duplicate log split scheduled for " + lf.getPath());
260        }
261      }
262      waitForSplittingCompletion(batch, status);
263    }
264
265    if (batch != null && batch.done != batch.installed) {
266      batch.isDead = true;
267      SplitLogCounters.tot_mgr_log_split_batch_err.increment();
268      LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
269          + " but only " + batch.done + " done");
270      String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
271      status.abort(msg);
272      throw new IOException(msg);
273    }
274    for (Path logDir : logDirs) {
275      status.setStatus("Cleaning up log directory...");
276      final FileSystem fs = logDir.getFileSystem(conf);
277      try {
278        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
279          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
280        }
281      } catch (IOException ioe) {
282        FileStatus[] files = fs.listStatus(logDir);
283        if (files != null && files.length > 0) {
284          LOG.warn("Returning success without actually splitting and "
285              + "deleting all the log files in path " + logDir + ": "
286              + Arrays.toString(files), ioe);
287        } else {
288          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
289        }
290      }
291      SplitLogCounters.tot_mgr_log_split_batch_success.increment();
292    }
293    String msg =
294        "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize) + " (" +
295            totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed) + " log files in " +
296            logDirs + " in " +
297            ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) +
298            "ms";
299    status.markComplete(msg);
300    LOG.info(msg);
301    return totalSize;
302  }
303
304  /**
305   * Add a task entry to coordination if it is not already there.
306   * @param taskname the path of the log to be split
307   * @param batch the batch this task belongs to
308   * @return true if a new entry is created, false if it is already there.
309   */
310  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
311    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
312    String task = getSplitLogManagerCoordination().prepareTask(taskname);
313    Task oldtask = createTaskIfAbsent(task, batch);
314    if (oldtask == null) {
315      // publish the task in the coordination engine
316      getSplitLogManagerCoordination().submitTask(task);
317      return true;
318    }
319    return false;
320  }
321
322  /**
323   * Get the amount of time in milliseconds to wait till next check.
324   * Check less frequently if a bunch of work to do still. At a max, check every minute.
325   * At a minimum, check every 100ms. This is to alleviate case where perhaps there are a bunch of
326   * threads waiting on a completion. For example, if the zk-based implementation, we will scan the
327   * '/hbase/splitWAL' dir every time through this loop. If there are lots of WALs to
328   * split -- could be tens of thousands if big cluster -- then it will take a while. If
329   * the Master has many SCPs waiting on wal splitting -- could be up to 10 x the configured
330   * PE thread count (default would be 160) -- then the Master will be putting up a bunch of
331   * load on zk.
332   */
333  static int getBatchWaitTimeMillis(int remainingTasks) {
334    return remainingTasks < 10? 100: remainingTasks < 100? 1000: 60_000;
335  }
336
337  private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
338    synchronized (batch) {
339      while ((batch.done + batch.error) != batch.installed) {
340        try {
341          status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
342              + batch.installed + " done=" + batch.done + " error=" + batch.error);
343          int remaining = batch.installed - (batch.done + batch.error);
344          int actual = activeTasks(batch);
345          if (remaining != actual) {
346            LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
347          }
348          int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
349          if (remainingTasks >= 0 && actual > remainingTasks) {
350            LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
351                + remainingTasks);
352          }
353          if (remainingTasks == 0 || actual == 0) {
354            LOG.warn("No more task remaining, splitting "
355                + "should have completed. Remaining tasks is " + remainingTasks
356                + ", active tasks in map " + actual);
357            if (remainingTasks == 0 && actual == 0) {
358              return;
359            }
360          }
361          batch.wait(getBatchWaitTimeMillis(remainingTasks));
362          if (server.isStopped()) {
363            LOG.warn("Stopped while waiting for log splits to be completed");
364            return;
365          }
366        } catch (InterruptedException e) {
367          LOG.warn("Interrupted while waiting for log splits to be completed");
368          Thread.currentThread().interrupt();
369          return;
370        }
371      }
372    }
373  }
374
375  @VisibleForTesting
376  ConcurrentMap<String, Task> getTasks() {
377    return tasks;
378  }
379
380  private int activeTasks(final TaskBatch batch) {
381    int count = 0;
382    for (Task t : tasks.values()) {
383      if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
384        count++;
385      }
386    }
387    return count;
388
389  }
390
391  /**
392   * @param path
393   * @param batch
394   * @return null on success, existing task on error
395   */
396  private Task createTaskIfAbsent(String path, TaskBatch batch) {
397    Task oldtask;
398    // batch.installed is only changed via this function and
399    // a single thread touches batch.installed.
400    Task newtask = new Task();
401    newtask.batch = batch;
402    oldtask = tasks.putIfAbsent(path, newtask);
403    if (oldtask == null) {
404      batch.installed++;
405      return null;
406    }
407    // new task was not used.
408    synchronized (oldtask) {
409      if (oldtask.isOrphan()) {
410        if (oldtask.status == SUCCESS) {
411          // The task is already done. Do not install the batch for this
412          // task because it might be too late for setDone() to update
413          // batch.done. There is no need for the batch creator to wait for
414          // this task to complete.
415          return (null);
416        }
417        if (oldtask.status == IN_PROGRESS) {
418          oldtask.batch = batch;
419          batch.installed++;
420          LOG.debug("Previously orphan task " + path + " is now being waited upon");
421          return null;
422        }
423        while (oldtask.status == FAILURE) {
424          LOG.debug("wait for status of task " + path + " to change to DELETED");
425          SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
426          try {
427            oldtask.wait();
428          } catch (InterruptedException e) {
429            Thread.currentThread().interrupt();
430            LOG.warn("Interrupted when waiting for znode delete callback");
431            // fall through to return failure
432            break;
433          }
434        }
435        if (oldtask.status != DELETED) {
436          LOG.warn("Failure because previously failed task"
437              + " state still present. Waiting for znode delete callback" + " path=" + path);
438          return oldtask;
439        }
440        // reinsert the newTask and it must succeed this time
441        Task t = tasks.putIfAbsent(path, newtask);
442        if (t == null) {
443          batch.installed++;
444          return null;
445        }
446        LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
447        assert false : "Deleted task still present in tasks map";
448        return t;
449      }
450      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
451      return oldtask;
452    }
453  }
454
455  public void stop() {
456    if (choreService != null) {
457      choreService.shutdown();
458    }
459    if (timeoutMonitor != null) {
460      timeoutMonitor.cancel(true);
461    }
462  }
463
464  void handleDeadWorker(ServerName workerName) {
465    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
466    // to reason about concurrency. Makes it easier to retry.
467    synchronized (deadWorkersLock) {
468      if (deadWorkers == null) {
469        deadWorkers = new HashSet<>(100);
470      }
471      deadWorkers.add(workerName);
472    }
473    LOG.info("Dead splitlog worker {}", workerName);
474  }
475
476  void handleDeadWorkers(Set<ServerName> serverNames) {
477    synchronized (deadWorkersLock) {
478      if (deadWorkers == null) {
479        deadWorkers = new HashSet<>(100);
480      }
481      deadWorkers.addAll(serverNames);
482    }
483    LOG.info("dead splitlog workers " + serverNames);
484  }
485
486  /**
487   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
488   * Clients threads use this object to wait for all their tasks to be done.
489   * <p>
490   * All access is synchronized.
491   */
492  @InterfaceAudience.Private
493  public static class TaskBatch {
494    public int installed = 0;
495    public int done = 0;
496    public int error = 0;
497    public volatile boolean isDead = false;
498
499    @Override
500    public String toString() {
501      return ("installed = " + installed + " done = " + done + " error = " + error);
502    }
503  }
504
505  /**
506   * in memory state of an active task.
507   */
508  @InterfaceAudience.Private
509  public static class Task {
510    public volatile long last_update;
511    public volatile int last_version;
512    public volatile ServerName cur_worker_name;
513    public volatile TaskBatch batch;
514    public volatile TerminationStatus status;
515    public volatile AtomicInteger incarnation = new AtomicInteger(0);
516    public final AtomicInteger unforcedResubmits = new AtomicInteger();
517    public volatile boolean resubmitThresholdReached;
518
519    @Override
520    public String toString() {
521      return ("last_update = " + last_update + " last_version = " + last_version
522          + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
523          + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
524    }
525
526    public Task() {
527      last_version = -1;
528      status = IN_PROGRESS;
529      setUnassigned();
530    }
531
532    public boolean isOrphan() {
533      return (batch == null || batch.isDead);
534    }
535
536    public boolean isUnassigned() {
537      return (cur_worker_name == null);
538    }
539
540    public void heartbeatNoDetails(long time) {
541      last_update = time;
542    }
543
544    public void heartbeat(long time, int version, ServerName worker) {
545      last_version = version;
546      last_update = time;
547      cur_worker_name = worker;
548    }
549
550    public void setUnassigned() {
551      cur_worker_name = null;
552      last_update = -1;
553    }
554  }
555
556  /**
557   * Periodically checks all active tasks and resubmits the ones that have timed out
558   */
559  private class TimeoutMonitor extends ScheduledChore {
560    private long lastLog = 0;
561
562    public TimeoutMonitor(final int period, Stoppable stopper) {
563      super("SplitLogManager Timeout Monitor", stopper, period);
564    }
565
566    @Override
567    protected void chore() {
568      if (server.getCoordinatedStateManager() == null) {
569        return;
570      }
571
572      int resubmitted = 0;
573      int unassigned = 0;
574      int tot = 0;
575      boolean found_assigned_task = false;
576      Set<ServerName> localDeadWorkers;
577
578      synchronized (deadWorkersLock) {
579        localDeadWorkers = deadWorkers;
580        deadWorkers = null;
581      }
582
583      for (Map.Entry<String, Task> e : tasks.entrySet()) {
584        String path = e.getKey();
585        Task task = e.getValue();
586        ServerName cur_worker = task.cur_worker_name;
587        tot++;
588        // don't easily resubmit a task which hasn't been picked up yet. It
589        // might be a long while before a SplitLogWorker is free to pick up a
590        // task. This is because a SplitLogWorker picks up a task one at a
591        // time. If we want progress when there are no region servers then we
592        // will have to run a SplitLogWorker thread in the Master.
593        if (task.isUnassigned()) {
594          unassigned++;
595          continue;
596        }
597        found_assigned_task = true;
598        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
599          SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
600          if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
601            resubmitted++;
602          } else {
603            handleDeadWorker(cur_worker);
604            LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
605                + ", will retry.");
606          }
607        } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
608          resubmitted++;
609        }
610      }
611      if (tot > 0) {
612        long now = EnvironmentEdgeManager.currentTime();
613        if (now > lastLog + 5000) {
614          lastLog = now;
615          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
616        }
617      }
618      if (resubmitted > 0) {
619        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
620      }
621      // If there are pending tasks and all of them have been unassigned for
622      // some time then put up a RESCAN node to ping the workers.
623      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
624      // because a. it is very unlikely that every worker had a
625      // transient error when trying to grab the task b. if there are no
626      // workers then all tasks wills stay unassigned indefinitely and the
627      // manager will be indefinitely creating RESCAN nodes. TODO may be the
628      // master should spawn both a manager and a worker thread to guarantee
629      // that there is always one worker in the system
630      if (tot > 0
631          && !found_assigned_task
632          && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
633        for (Map.Entry<String, Task> e : tasks.entrySet()) {
634          String key = e.getKey();
635          Task task = e.getValue();
636          // we have to do task.isUnassigned() check again because tasks might
637          // have been asynchronously assigned. There is no locking required
638          // for these checks ... it is OK even if tryGetDataSetWatch() is
639          // called unnecessarily for a taskpath
640          if (task.isUnassigned() && (task.status != FAILURE)) {
641            // We just touch the znode to make sure its still there
642            getSplitLogManagerCoordination().checkTaskStillAvailable(key);
643          }
644        }
645        getSplitLogManagerCoordination().checkTasks();
646        SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
647        LOG.debug("resubmitting unassigned task(s) after timeout");
648      }
649      Set<String> failedDeletions =
650        getSplitLogManagerCoordination().getDetails().getFailedDeletions();
651      // Retry previously failed deletes
652      if (failedDeletions.size() > 0) {
653        List<String> tmpPaths = new ArrayList<>(failedDeletions);
654        for (String tmpPath : tmpPaths) {
655          // deleteNode is an async call
656          getSplitLogManagerCoordination().deleteTask(tmpPath);
657        }
658        failedDeletions.removeAll(tmpPaths);
659      }
660    }
661  }
662
663  public enum ResubmitDirective {
664    CHECK(), FORCE()
665  }
666
667  public enum TerminationStatus {
668    IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
669
670    final String statusMsg;
671
672    TerminationStatus(String msg) {
673      statusMsg = msg;
674    }
675
676    @Override
677    public String toString() {
678      return statusMsg;
679    }
680  }
681}