001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master;
019
020import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
021import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
022import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
023import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
024import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
025import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
026
027import java.io.IOException;
028import java.util.ArrayList;
029import java.util.Arrays;
030import java.util.Collections;
031import java.util.HashSet;
032import java.util.List;
033import java.util.Map;
034import java.util.Set;
035import java.util.concurrent.ConcurrentHashMap;
036import java.util.concurrent.ConcurrentMap;
037import java.util.concurrent.atomic.AtomicInteger;
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.fs.FileStatus;
040import org.apache.hadoop.fs.FileSystem;
041import org.apache.hadoop.fs.Path;
042import org.apache.hadoop.fs.PathFilter;
043import org.apache.hadoop.hbase.ChoreService;
044import org.apache.hadoop.hbase.ScheduledChore;
045import org.apache.hadoop.hbase.ServerName;
046import org.apache.hadoop.hbase.SplitLogCounters;
047import org.apache.hadoop.hbase.Stoppable;
048import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
049import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
050import org.apache.hadoop.hbase.log.HBaseMarkers;
051import org.apache.hadoop.hbase.monitoring.MonitoredTask;
052import org.apache.hadoop.hbase.monitoring.TaskMonitor;
053import org.apache.hadoop.hbase.procedure2.util.StringUtils;
054import org.apache.hadoop.hbase.util.CommonFSUtils;
055import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
056import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
057import org.apache.yetus.audience.InterfaceAudience;
058import org.slf4j.Logger;
059import org.slf4j.LoggerFactory;
060
061/**
062 * Distributes the task of log splitting to the available region servers. Coordination happens via
063 * coordination engine. For every log file that has to be split a task is created. SplitLogWorkers
064 * race to grab a task.
065 * <p>
066 * SplitLogManager monitors the tasks that it creates using the timeoutMonitor thread. If a task's
067 * progress is slow then {@link SplitLogManagerCoordination#checkTasks} will take away the task from
068 * the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} and the task will be up for
069 * grabs again. When the task is done then it is deleted by SplitLogManager.
070 * <p>
071 * Clients call {@link #splitLogDistributed(Path)} to split a region server's log files. The caller
072 * thread waits in this method until all the log files have been split.
073 * <p>
074 * All the coordination calls made by this class are asynchronous. This is mainly to help reduce
075 * response time seen by the callers.
076 * <p>
077 * There is race in this design between the SplitLogManager and the SplitLogWorker. SplitLogManager
078 * might re-queue a task that has in reality already been completed by a SplitLogWorker. We rely on
079 * the idempotency of the log splitting task for correctness.
080 * <p>
081 * It is also assumed that every log splitting task is unique and once completed (either with
082 * success or with error) it will be not be submitted again. If a task is resubmitted then there is
083 * a risk that old "delete task" can delete the re-submission.
084 * @see SplitWALManager for an alternate implementation based on Procedures.
085 * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based
086 *             distributed WAL splitter, see SplitWALManager.
087 */
088@Deprecated
089@InterfaceAudience.Private
090public class SplitLogManager {
091  private static final Logger LOG = LoggerFactory.getLogger(SplitLogManager.class);
092
093  private final MasterServices server;
094
095  private final Configuration conf;
096  private final ChoreService choreService;
097
098  public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
099
100  private long unassignedTimeout;
101  private long lastTaskCreateTime = Long.MAX_VALUE;
102
103  final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<>();
104  private TimeoutMonitor timeoutMonitor;
105
106  private volatile Set<ServerName> deadWorkers = null;
107  private final Object deadWorkersLock = new Object();
108
109  /**
110   * Its OK to construct this object even when region-servers are not online. It does lookup the
111   * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
112   * @param master the master services
113   * @param conf   the HBase configuration
114   */
115  public SplitLogManager(MasterServices master, Configuration conf) throws IOException {
116    this.server = master;
117    this.conf = conf;
118    // If no CoordinatedStateManager, skip registering as a chore service (The
119    // CoordinatedStateManager is non-null if we are running the ZK-based distributed WAL
120    // splitting. It is null if we are configured to use procedure-based distributed WAL
121    // splitting.
122    if (server.getCoordinatedStateManager() != null) {
123      this.choreService =
124        new ChoreService(master.getServerName().toShortString() + ".splitLogManager.");
125      SplitLogManagerCoordination coordination = getSplitLogManagerCoordination();
126      Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
127      SplitLogManagerDetails details = new SplitLogManagerDetails(tasks, master, failedDeletions);
128      coordination.setDetails(details);
129      coordination.init();
130      this.unassignedTimeout =
131        conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
132      this.timeoutMonitor = new TimeoutMonitor(
133        conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), master);
134      this.choreService.scheduleChore(timeoutMonitor);
135    } else {
136      this.choreService = null;
137      this.timeoutMonitor = null;
138    }
139  }
140
141  private SplitLogManagerCoordination getSplitLogManagerCoordination() {
142    return server.getCoordinatedStateManager().getSplitLogManagerCoordination();
143  }
144
145  private List<FileStatus> getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
146    return getFileList(conf, logDirs, filter);
147  }
148
149  /**
150   * Get a list of paths that need to be split given a set of server-specific directories and
151   * optionally a filter.
152   * <p/>
153   * See {@link AbstractFSWALProvider#getServerNameFromWALDirectoryName} for more info on directory
154   * layout.
155   * <p/>
156   * Should be package-private, but is needed by
157   * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem, Configuration, org.apache.hadoop.hbase.wal.WALFactory)}
158   * for tests.
159   */
160  public static List<FileStatus> getFileList(final Configuration conf, final List<Path> logDirs,
161    final PathFilter filter) throws IOException {
162    List<FileStatus> fileStatus = new ArrayList<>();
163    for (Path logDir : logDirs) {
164      final FileSystem fs = logDir.getFileSystem(conf);
165      if (!fs.exists(logDir)) {
166        LOG.warn(logDir + " doesn't exist. Nothing to do!");
167        continue;
168      }
169      FileStatus[] logfiles = CommonFSUtils.listStatus(fs, logDir, filter);
170      if (logfiles == null || logfiles.length == 0) {
171        LOG.info("{} dir is empty, no logs to split.", logDir);
172      } else {
173        Collections.addAll(fileStatus, logfiles);
174      }
175    }
176
177    return fileStatus;
178  }
179
180  /**
181   * @param logDir one region sever wal dir path in .logs
182   * @throws IOException if there was an error while splitting any log file
183   * @return cumulative size of the logfiles split
184   */
185  public long splitLogDistributed(final Path logDir) throws IOException {
186    List<Path> logDirs = new ArrayList<>();
187    logDirs.add(logDir);
188    return splitLogDistributed(logDirs);
189  }
190
191  /**
192   * The caller will block until all the log files of the given region server have been processed -
193   * successfully split or an error is encountered - by an available worker region server. This
194   * method must only be called after the region servers have been brought online.
195   * @param logDirs List of log dirs to split
196   * @throws IOException If there was an error while splitting any log file
197   * @return cumulative size of the logfiles split
198   */
199  public long splitLogDistributed(final List<Path> logDirs) throws IOException {
200    if (logDirs.isEmpty()) {
201      return 0;
202    }
203    Set<ServerName> serverNames = new HashSet<>();
204    for (Path logDir : logDirs) {
205      try {
206        ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(logDir);
207        if (serverName != null) {
208          serverNames.add(serverName);
209        }
210      } catch (IllegalArgumentException e) {
211        // ignore invalid format error.
212        LOG.warn("Cannot parse server name from " + logDir);
213      }
214    }
215    return splitLogDistributed(serverNames, logDirs, null);
216  }
217
218  /**
219   * The caller will block until all the hbase:meta log files of the given region server have been
220   * processed - successfully split or an error is encountered - by an available worker region
221   * server. This method must only be called after the region servers have been brought online.
222   * @param logDirs List of log dirs to split
223   * @param filter  the Path filter to select specific files for considering
224   * @throws IOException If there was an error while splitting any log file
225   * @return cumulative size of the logfiles split
226   */
227  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
228    PathFilter filter) throws IOException {
229    MonitoredTask status = TaskMonitor.get()
230      .createStatus("Doing distributed log split in " + logDirs + " for serverName=" + serverNames);
231    long totalSize = 0;
232    TaskBatch batch = null;
233    long startTime = 0;
234    List<FileStatus> logfiles = getFileList(logDirs, filter);
235    if (!logfiles.isEmpty()) {
236      status.setStatus("Checking directory contents...");
237      SplitLogCounters.tot_mgr_log_split_batch_start.increment();
238      LOG.info(
239        "Started splitting " + logfiles.size() + " logs in " + logDirs + " for " + serverNames);
240      startTime = EnvironmentEdgeManager.currentTime();
241      batch = new TaskBatch();
242      for (FileStatus lf : logfiles) {
243        // TODO If the log file is still being written to - which is most likely
244        // the case for the last log file - then its length will show up here
245        // as zero. The size of such a file can only be retrieved after
246        // recover-lease is done. totalSize will be under in most cases and the
247        // metrics that it drives will also be under-reported.
248        totalSize += lf.getLen();
249        String pathToLog = CommonFSUtils.removeWALRootPath(lf.getPath(), conf);
250        if (!enqueueSplitTask(pathToLog, batch)) {
251          throw new IOException("duplicate log split scheduled for " + lf.getPath());
252        }
253      }
254      waitForSplittingCompletion(batch, status);
255    }
256
257    if (batch != null && batch.done != batch.installed) {
258      batch.isDead = true;
259      SplitLogCounters.tot_mgr_log_split_batch_err.increment();
260      LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
261        + " but only " + batch.done + " done");
262      String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
263      status.abort(msg);
264      throw new IOException(msg);
265    }
266    for (Path logDir : logDirs) {
267      status.setStatus("Cleaning up log directory...");
268      final FileSystem fs = logDir.getFileSystem(conf);
269      try {
270        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
271          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
272        }
273      } catch (IOException ioe) {
274        FileStatus[] files = fs.listStatus(logDir);
275        if (files != null && files.length > 0) {
276          LOG.warn(
277            "Returning success without actually splitting and "
278              + "deleting all the log files in path " + logDir + ": " + Arrays.toString(files),
279            ioe);
280        } else {
281          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
282        }
283      }
284      SplitLogCounters.tot_mgr_log_split_batch_success.increment();
285    }
286    String msg = "Finished splitting (more than or equal to) " + StringUtils.humanSize(totalSize)
287      + " (" + totalSize + " bytes) in " + ((batch == null) ? 0 : batch.installed)
288      + " log files in " + logDirs + " in "
289      + ((startTime == 0) ? startTime : (EnvironmentEdgeManager.currentTime() - startTime)) + "ms";
290    status.markComplete(msg);
291    LOG.info(msg);
292    return totalSize;
293  }
294
295  /**
296   * Add a task entry to coordination if it is not already there.
297   * @param taskname the path of the log to be split
298   * @param batch    the batch this task belongs to
299   * @return true if a new entry is created, false if it is already there.
300   */
301  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
302    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
303    String task = getSplitLogManagerCoordination().prepareTask(taskname);
304    Task oldtask = createTaskIfAbsent(task, batch);
305    if (oldtask == null) {
306      // publish the task in the coordination engine
307      getSplitLogManagerCoordination().submitTask(task);
308      return true;
309    }
310    return false;
311  }
312
313  /**
314   * Get the amount of time in milliseconds to wait till next check. Check less frequently if a
315   * bunch of work to do still. At a max, check every minute. At a minimum, check every 100ms. This
316   * is to alleviate case where perhaps there are a bunch of threads waiting on a completion. For
317   * example, if the zk-based implementation, we will scan the '/hbase/splitWAL' dir every time
318   * through this loop. If there are lots of WALs to split -- could be tens of thousands if big
319   * cluster -- then it will take a while. If the Master has many SCPs waiting on wal splitting --
320   * could be up to 10 x the configured PE thread count (default would be 160) -- then the Master
321   * will be putting up a bunch of load on zk.
322   */
323  static int getBatchWaitTimeMillis(int remainingTasks) {
324    return remainingTasks < 10 ? 100 : remainingTasks < 100 ? 1000 : 60_000;
325  }
326
327  private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
328    synchronized (batch) {
329      while ((batch.done + batch.error) != batch.installed) {
330        try {
331          status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
332            + batch.installed + " done=" + batch.done + " error=" + batch.error);
333          int remaining = batch.installed - (batch.done + batch.error);
334          int actual = activeTasks(batch);
335          if (remaining != actual) {
336            LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
337          }
338          int remainingTasks = getSplitLogManagerCoordination().remainingTasksInCoordination();
339          if (remainingTasks >= 0 && actual > remainingTasks) {
340            LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
341              + remainingTasks);
342          }
343          if (remainingTasks == 0 || actual == 0) {
344            LOG.warn(
345              "No more task remaining, splitting " + "should have completed. Remaining tasks is "
346                + remainingTasks + ", active tasks in map " + actual);
347            if (remainingTasks == 0 && actual == 0) {
348              return;
349            }
350          }
351          batch.wait(getBatchWaitTimeMillis(remainingTasks));
352          if (server.isStopped()) {
353            LOG.warn("Stopped while waiting for log splits to be completed");
354            return;
355          }
356        } catch (InterruptedException e) {
357          LOG.warn("Interrupted while waiting for log splits to be completed");
358          Thread.currentThread().interrupt();
359          return;
360        }
361      }
362    }
363  }
364
365  ConcurrentMap<String, Task> getTasks() {
366    return tasks;
367  }
368
369  private int activeTasks(final TaskBatch batch) {
370    int count = 0;
371    for (Task t : tasks.values()) {
372      if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
373        count++;
374      }
375    }
376    return count;
377
378  }
379
380  /** Returns null on success, existing task on error */
381  private Task createTaskIfAbsent(String path, TaskBatch batch) {
382    Task oldtask;
383    // batch.installed is only changed via this function and
384    // a single thread touches batch.installed.
385    Task newtask = new Task();
386    newtask.batch = batch;
387    oldtask = tasks.putIfAbsent(path, newtask);
388    if (oldtask == null) {
389      batch.installed++;
390      return null;
391    }
392    // new task was not used.
393    synchronized (oldtask) {
394      if (oldtask.isOrphan()) {
395        if (oldtask.status == SUCCESS) {
396          // The task is already done. Do not install the batch for this
397          // task because it might be too late for setDone() to update
398          // batch.done. There is no need for the batch creator to wait for
399          // this task to complete.
400          return (null);
401        }
402        if (oldtask.status == IN_PROGRESS) {
403          oldtask.batch = batch;
404          batch.installed++;
405          LOG.debug("Previously orphan task " + path + " is now being waited upon");
406          return null;
407        }
408        while (oldtask.status == FAILURE) {
409          LOG.debug("wait for status of task " + path + " to change to DELETED");
410          SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
411          try {
412            oldtask.wait();
413          } catch (InterruptedException e) {
414            Thread.currentThread().interrupt();
415            LOG.warn("Interrupted when waiting for znode delete callback");
416            // fall through to return failure
417            break;
418          }
419        }
420        if (oldtask.status != DELETED) {
421          LOG.warn("Failure because previously failed task"
422            + " state still present. Waiting for znode delete callback" + " path=" + path);
423          return oldtask;
424        }
425        // reinsert the newTask and it must succeed this time
426        Task t = tasks.putIfAbsent(path, newtask);
427        if (t == null) {
428          batch.installed++;
429          return null;
430        }
431        LOG.error(HBaseMarkers.FATAL, "Logic error. Deleted task still present in tasks map");
432        assert false : "Deleted task still present in tasks map";
433        return t;
434      }
435      LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
436      return oldtask;
437    }
438  }
439
440  public void stop() {
441    if (choreService != null) {
442      choreService.shutdown();
443    }
444    if (timeoutMonitor != null) {
445      timeoutMonitor.shutdown(true);
446    }
447  }
448
449  void handleDeadWorker(ServerName workerName) {
450    // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
451    // to reason about concurrency. Makes it easier to retry.
452    synchronized (deadWorkersLock) {
453      if (deadWorkers == null) {
454        deadWorkers = new HashSet<>(100);
455      }
456      deadWorkers.add(workerName);
457    }
458    LOG.info("Dead splitlog worker {}", workerName);
459  }
460
461  void handleDeadWorkers(Set<ServerName> serverNames) {
462    synchronized (deadWorkersLock) {
463      if (deadWorkers == null) {
464        deadWorkers = new HashSet<>(100);
465      }
466      deadWorkers.addAll(serverNames);
467    }
468    LOG.info("dead splitlog workers " + serverNames);
469  }
470
471  /**
472   * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
473   * Clients threads use this object to wait for all their tasks to be done.
474   * <p>
475   * All access is synchronized.
476   */
477  @InterfaceAudience.Private
478  public static class TaskBatch {
479    public int installed = 0;
480    public int done = 0;
481    public int error = 0;
482    public volatile boolean isDead = false;
483
484    @Override
485    public String toString() {
486      return ("installed = " + installed + " done = " + done + " error = " + error);
487    }
488  }
489
490  /**
491   * in memory state of an active task.
492   */
493  @InterfaceAudience.Private
494  public static class Task {
495    public volatile long last_update;
496    public volatile int last_version;
497    public volatile ServerName cur_worker_name;
498    public volatile TaskBatch batch;
499    public volatile TerminationStatus status;
500    public volatile AtomicInteger incarnation = new AtomicInteger(0);
501    public final AtomicInteger unforcedResubmits = new AtomicInteger();
502    public volatile boolean resubmitThresholdReached;
503
504    @Override
505    public String toString() {
506      return ("last_update = " + last_update + " last_version = " + last_version
507        + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
508        + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
509    }
510
511    public Task() {
512      last_version = -1;
513      status = IN_PROGRESS;
514      setUnassigned();
515    }
516
517    public boolean isOrphan() {
518      return (batch == null || batch.isDead);
519    }
520
521    public boolean isUnassigned() {
522      return (cur_worker_name == null);
523    }
524
525    public void heartbeatNoDetails(long time) {
526      last_update = time;
527    }
528
529    public void heartbeat(long time, int version, ServerName worker) {
530      last_version = version;
531      last_update = time;
532      cur_worker_name = worker;
533    }
534
535    public void setUnassigned() {
536      cur_worker_name = null;
537      last_update = -1;
538    }
539  }
540
541  /**
542   * Periodically checks all active tasks and resubmits the ones that have timed out
543   */
544  private class TimeoutMonitor extends ScheduledChore {
545    private long lastLog = 0;
546
547    public TimeoutMonitor(final int period, Stoppable stopper) {
548      super("SplitLogManager Timeout Monitor", stopper, period);
549    }
550
551    @Override
552    protected void chore() {
553      if (server.getCoordinatedStateManager() == null) {
554        return;
555      }
556
557      int resubmitted = 0;
558      int unassigned = 0;
559      int tot = 0;
560      boolean found_assigned_task = false;
561      Set<ServerName> localDeadWorkers;
562
563      synchronized (deadWorkersLock) {
564        localDeadWorkers = deadWorkers;
565        deadWorkers = null;
566      }
567
568      for (Map.Entry<String, Task> e : tasks.entrySet()) {
569        String path = e.getKey();
570        Task task = e.getValue();
571        ServerName cur_worker = task.cur_worker_name;
572        tot++;
573        // don't easily resubmit a task which hasn't been picked up yet. It
574        // might be a long while before a SplitLogWorker is free to pick up a
575        // task. This is because a SplitLogWorker picks up a task one at a
576        // time. If we want progress when there are no region servers then we
577        // will have to run a SplitLogWorker thread in the Master.
578        if (task.isUnassigned()) {
579          unassigned++;
580          continue;
581        }
582        found_assigned_task = true;
583        if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
584          SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
585          if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
586            resubmitted++;
587          } else {
588            handleDeadWorker(cur_worker);
589            LOG.warn(
590              "Failed to resubmit task " + path + " owned by dead " + cur_worker + ", will retry.");
591          }
592        } else if (getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
593          resubmitted++;
594        }
595      }
596      if (tot > 0) {
597        long now = EnvironmentEdgeManager.currentTime();
598        if (now > lastLog + 5000) {
599          lastLog = now;
600          LOG.info("total=" + tot + ", unassigned=" + unassigned + ", tasks=" + tasks);
601        }
602      }
603      if (resubmitted > 0) {
604        LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
605      }
606      // If there are pending tasks and all of them have been unassigned for
607      // some time then put up a RESCAN node to ping the workers.
608      // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
609      // because a. it is very unlikely that every worker had a
610      // transient error when trying to grab the task b. if there are no
611      // workers then all tasks wills stay unassigned indefinitely and the
612      // manager will be indefinitely creating RESCAN nodes. TODO may be the
613      // master should spawn both a manager and a worker thread to guarantee
614      // that there is always one worker in the system
615      if (
616        tot > 0 && !found_assigned_task
617          && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)
618      ) {
619        for (Map.Entry<String, Task> e : tasks.entrySet()) {
620          String key = e.getKey();
621          Task task = e.getValue();
622          // we have to do task.isUnassigned() check again because tasks might
623          // have been asynchronously assigned. There is no locking required
624          // for these checks ... it is OK even if tryGetDataSetWatch() is
625          // called unnecessarily for a taskpath
626          if (task.isUnassigned() && (task.status != FAILURE)) {
627            // We just touch the znode to make sure its still there
628            getSplitLogManagerCoordination().checkTaskStillAvailable(key);
629          }
630        }
631        getSplitLogManagerCoordination().checkTasks();
632        SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
633        LOG.debug("resubmitting unassigned task(s) after timeout");
634      }
635      Set<String> failedDeletions =
636        getSplitLogManagerCoordination().getDetails().getFailedDeletions();
637      // Retry previously failed deletes
638      if (failedDeletions.size() > 0) {
639        List<String> tmpPaths = new ArrayList<>(failedDeletions);
640        for (String tmpPath : tmpPaths) {
641          // deleteNode is an async call
642          getSplitLogManagerCoordination().deleteTask(tmpPath);
643        }
644        failedDeletions.removeAll(tmpPaths);
645      }
646    }
647  }
648
649  public enum ResubmitDirective {
650    CHECK(),
651    FORCE()
652  }
653
654  public enum TerminationStatus {
655    IN_PROGRESS("in_progress"),
656    SUCCESS("success"),
657    FAILURE("failure"),
658    DELETED("deleted");
659
660    final String statusMsg;
661
662    TerminationStatus(String msg) {
663      statusMsg = msg;
664    }
665
666    @Override
667    public String toString() {
668      return statusMsg;
669    }
670  }
671}