1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.HashSet;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.Set;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.ConcurrentMap;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.locks.ReentrantLock;
40  
41  import org.apache.commons.logging.Log;
42  import org.apache.commons.logging.LogFactory;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.ChoreService;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.ScheduledChore;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.SplitLogCounters;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.classification.InterfaceAudience;
56  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
57  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
58  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
59  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
61  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63  import org.apache.hadoop.hbase.util.FSUtils;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66  import org.apache.hadoop.hbase.wal.WALFactory;
67  
68  import com.google.common.annotations.VisibleForTesting;
69  
70  
71  
72  
73  
74  
75  
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88  
89  
90  
91  
92  
93  
94  
95  
96  
97  
98  
99  @InterfaceAudience.Private
100 public class SplitLogManager {
101   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
102 
103   private Server server;
104 
105   private final Stoppable stopper;
106   private final Configuration conf;
107   private final ChoreService choreService;
108 
109   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); 
110 
111   private long unassignedTimeout;
112   private long lastTaskCreateTime = Long.MAX_VALUE;
113   private long checkRecoveringTimeThreshold = 15000; 
114   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
115       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
116 
117   
118 
119 
120 
121   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
122 
123   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
124   private TimeoutMonitor timeoutMonitor;
125 
126   private volatile Set<ServerName> deadWorkers = null;
127   private final Object deadWorkersLock = new Object();
128 
129   
130 
131 
132 
133 
134 
135 
136 
137 
138 
139   public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
140       MasterServices master, ServerName serverName) throws IOException {
141     this.server = server;
142     this.conf = conf;
143     this.stopper = stopper;
144     this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_");
145     if (server.getCoordinatedStateManager() != null) {
146       SplitLogManagerCoordination coordination =
147           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
148               .getSplitLogManagerCoordination();
149       Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
150       SplitLogManagerDetails details =
151           new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
152       coordination.setDetails(details);
153       coordination.init();
154       
155     }
156     this.unassignedTimeout =
157         conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
158     this.timeoutMonitor =
159         new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
160             stopper);
161     choreService.scheduleChore(timeoutMonitor);
162   }
163 
164   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
165     return getFileList(conf, logDirs, filter);
166   }
167 
168   
169 
170 
171 
172 
173 
174 
175 
176 
177 
178 
179   @VisibleForTesting
180   public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
181       final PathFilter filter)
182       throws IOException {
183     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
184     for (Path logDir : logDirs) {
185       final FileSystem fs = logDir.getFileSystem(conf);
186       if (!fs.exists(logDir)) {
187         LOG.warn(logDir + " doesn't exist. Nothing to do!");
188         continue;
189       }
190       FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
191       if (logfiles == null || logfiles.length == 0) {
192         LOG.info(logDir + " is empty dir, no logs to split");
193       } else {
194         Collections.addAll(fileStatus, logfiles);
195       }
196     }
197     FileStatus[] a = new FileStatus[fileStatus.size()];
198     return fileStatus.toArray(a);
199   }
200 
201   
202 
203 
204 
205 
206 
207   public long splitLogDistributed(final Path logDir) throws IOException {
208     List<Path> logDirs = new ArrayList<Path>();
209     logDirs.add(logDir);
210     return splitLogDistributed(logDirs);
211   }
212 
213   
214 
215 
216 
217 
218 
219 
220 
221   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
222     if (logDirs.isEmpty()) {
223       return 0;
224     }
225     Set<ServerName> serverNames = new HashSet<ServerName>();
226     for (Path logDir : logDirs) {
227       try {
228         ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
229         if (serverName != null) {
230           serverNames.add(serverName);
231         }
232       } catch (IllegalArgumentException e) {
233         
234         LOG.warn("Cannot parse server name from " + logDir);
235       }
236     }
237     return splitLogDistributed(serverNames, logDirs, null);
238   }
239 
240   
241 
242 
243 
244 
245 
246 
247 
248 
249   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
250       PathFilter filter) throws IOException {
251     MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
252       logDirs + " for serverName=" + serverNames);
253     FileStatus[] logfiles = getFileList(logDirs, filter);
254     status.setStatus("Checking directory contents...");
255     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
256     LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
257       " for " + serverNames);
258     long t = EnvironmentEdgeManager.currentTime();
259     long totalSize = 0;
260     TaskBatch batch = new TaskBatch();
261     Boolean isMetaRecovery = (filter == null) ? null : false;
262     for (FileStatus lf : logfiles) {
263       
264       
265       
266       
267       
268       totalSize += lf.getLen();
269       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
270       if (!enqueueSplitTask(pathToLog, batch)) {
271         throw new IOException("duplicate log split scheduled for " + lf.getPath());
272       }
273     }
274     waitForSplittingCompletion(batch, status);
275     
276     if (filter == MasterFileSystem.META_FILTER 
277       
278       
279       isMetaRecovery = true;
280     }
281     removeRecoveringRegions(serverNames, isMetaRecovery);
282 
283     if (batch.done != batch.installed) {
284       batch.isDead = true;
285       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
286       LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
287           + " but only " + batch.done + " done");
288       String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
289       status.abort(msg);
290       throw new IOException(msg);
291     }
292     for (Path logDir : logDirs) {
293       status.setStatus("Cleaning up log directory...");
294       final FileSystem fs = logDir.getFileSystem(conf);
295       try {
296         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
297           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
298         }
299       } catch (IOException ioe) {
300         FileStatus[] files = fs.listStatus(logDir);
301         if (files != null && files.length > 0) {
302           LOG.warn("Returning success without actually splitting and "
303               + "deleting all the log files in path " + logDir + ": "
304               + Arrays.toString(files), ioe);
305         } else {
306           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
307         }
308       }
309       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
310     }
311     String msg =
312         "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
313             + " log files in " + logDirs + " in "
314             + (EnvironmentEdgeManager.currentTime() - t) + "ms";
315     status.markComplete(msg);
316     LOG.info(msg);
317     return totalSize;
318   }
319 
320   
321 
322 
323 
324 
325 
326   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
327     lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
328     String task =
329         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
330             .getSplitLogManagerCoordination().prepareTask(taskname);
331     Task oldtask = createTaskIfAbsent(task, batch);
332     if (oldtask == null) {
333       
334       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
335           .getSplitLogManagerCoordination().submitTask(task);
336       return true;
337     }
338     return false;
339   }
340 
341   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
342     synchronized (batch) {
343       while ((batch.done + batch.error) != batch.installed) {
344         try {
345           status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
346               + batch.installed + " done=" + batch.done + " error=" + batch.error);
347           int remaining = batch.installed - (batch.done + batch.error);
348           int actual = activeTasks(batch);
349           if (remaining != actual) {
350             LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
351           }
352           int remainingTasks =
353               ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
354                   .getSplitLogManagerCoordination().remainingTasksInCoordination();
355           if (remainingTasks >= 0 && actual > remainingTasks) {
356             LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
357                 + remainingTasks);
358           }
359           if (remainingTasks == 0 || actual == 0) {
360             LOG.warn("No more task remaining, splitting "
361                 + "should have completed. Remaining tasks is " + remainingTasks
362                 + ", active tasks in map " + actual);
363             if (remainingTasks == 0 && actual == 0) {
364               return;
365             }
366           }
367           batch.wait(100);
368           if (stopper.isStopped()) {
369             LOG.warn("Stopped while waiting for log splits to be completed");
370             return;
371           }
372         } catch (InterruptedException e) {
373           LOG.warn("Interrupted while waiting for log splits to be completed");
374           Thread.currentThread().interrupt();
375           return;
376         }
377       }
378     }
379   }
380 
381   @VisibleForTesting
382   ConcurrentMap<String, Task> getTasks() {
383     return tasks;
384   }
385 
386   private int activeTasks(final TaskBatch batch) {
387     int count = 0;
388     for (Task t : tasks.values()) {
389       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
390         count++;
391       }
392     }
393     return count;
394 
395   }
396 
397   
398 
399 
400 
401 
402 
403 
404   private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
405     if (!isLogReplaying()) {
406       
407       return;
408     }
409     if (serverNames == null || serverNames.isEmpty()) return;
410 
411     Set<String> recoveredServerNameSet = new HashSet<String>();
412     for (ServerName tmpServerName : serverNames) {
413       recoveredServerNameSet.add(tmpServerName.getServerName());
414     }
415    
416     this.recoveringRegionLock.lock();
417     try {
418       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
419           .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
420             isMetaRecovery);
421     } catch (IOException e) {
422       LOG.warn("removeRecoveringRegions got exception. Will retry", e);
423       if (serverNames != null && !serverNames.isEmpty()) {
424         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
425             isMetaRecovery));
426       }
427     } finally {
428       this.recoveringRegionLock.unlock();
429     }
430   }
431 
432   
433 
434 
435 
436 
437 
438   void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
439       InterruptedIOException {
440     Set<String> knownFailedServers = new HashSet<String>();
441     if (failedServers != null) {
442       for (ServerName tmpServerName : failedServers) {
443         knownFailedServers.add(tmpServerName.getServerName());
444       }
445     }
446 
447     this.recoveringRegionLock.lock();
448     try {
449       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
450           .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
451     } finally {
452       this.recoveringRegionLock.unlock();
453     }
454   }
455 
456   
457 
458 
459 
460 
461   private Task createTaskIfAbsent(String path, TaskBatch batch) {
462     Task oldtask;
463     
464     
465     Task newtask = new Task();
466     newtask.batch = batch;
467     oldtask = tasks.putIfAbsent(path, newtask);
468     if (oldtask == null) {
469       batch.installed++;
470       return null;
471     }
472     
473     synchronized (oldtask) {
474       if (oldtask.isOrphan()) {
475         if (oldtask.status == SUCCESS) {
476           
477           
478           
479           
480           return (null);
481         }
482         if (oldtask.status == IN_PROGRESS) {
483           oldtask.batch = batch;
484           batch.installed++;
485           LOG.debug("Previously orphan task " + path + " is now being waited upon");
486           return null;
487         }
488         while (oldtask.status == FAILURE) {
489           LOG.debug("wait for status of task " + path + " to change to DELETED");
490           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
491           try {
492             oldtask.wait();
493           } catch (InterruptedException e) {
494             Thread.currentThread().interrupt();
495             LOG.warn("Interrupted when waiting for znode delete callback");
496             
497             break;
498           }
499         }
500         if (oldtask.status != DELETED) {
501           LOG.warn("Failure because previously failed task"
502               + " state still present. Waiting for znode delete callback" + " path=" + path);
503           return oldtask;
504         }
505         
506         Task t = tasks.putIfAbsent(path, newtask);
507         if (t == null) {
508           batch.installed++;
509           return null;
510         }
511         LOG.fatal("Logic error. Deleted task still present in tasks map");
512         assert false : "Deleted task still present in tasks map";
513         return t;
514       }
515       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
516       return oldtask;
517     }
518   }
519 
520   Task findOrCreateOrphanTask(String path) {
521     Task orphanTask = new Task();
522     Task task;
523     task = tasks.putIfAbsent(path, orphanTask);
524     if (task == null) {
525       LOG.info("creating orphan task " + path);
526       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
527       task = orphanTask;
528     }
529     return task;
530   }
531 
532   public void stop() {
533     if (choreService != null) {
534       choreService.shutdown();
535     }
536     if (timeoutMonitor != null) {
537       timeoutMonitor.cancel(true);
538     }
539   }
540 
541   void handleDeadWorker(ServerName workerName) {
542     
543     
544     synchronized (deadWorkersLock) {
545       if (deadWorkers == null) {
546         deadWorkers = new HashSet<ServerName>(100);
547       }
548       deadWorkers.add(workerName);
549     }
550     LOG.info("dead splitlog worker " + workerName);
551   }
552 
553   void handleDeadWorkers(Set<ServerName> serverNames) {
554     synchronized (deadWorkersLock) {
555       if (deadWorkers == null) {
556         deadWorkers = new HashSet<ServerName>(100);
557       }
558       deadWorkers.addAll(serverNames);
559     }
560     LOG.info("dead splitlog workers " + serverNames);
561   }
562 
563   
564 
565 
566 
567 
568 
569   public void setRecoveryMode(boolean isForInitialization) throws IOException {
570     ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
571         .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
572 
573   }
574 
575   public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
576       throws InterruptedIOException, IOException {
577     if (userRegions == null || (!isLogReplaying())) {
578       return;
579     }
580     try {
581       this.recoveringRegionLock.lock();
582       
583       ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
584           .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
585     } finally {
586       this.recoveringRegionLock.unlock();
587     }
588 
589   }
590 
591   
592 
593 
594   public boolean isLogReplaying() {
595     if (server.getCoordinatedStateManager() == null) return false;
596     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
597         .getSplitLogManagerCoordination().isReplaying();
598   }
599 
600   
601 
602 
603   public boolean isLogSplitting() {
604     if (server.getCoordinatedStateManager() == null) return false;
605     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
606         .getSplitLogManagerCoordination().isSplitting();
607   }
608 
609   
610 
611 
612   public RecoveryMode getRecoveryMode() {
613     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
614         .getSplitLogManagerCoordination().getRecoveryMode();
615   }
616 
617   
618 
619 
620 
621 
622 
623   @InterfaceAudience.Private
624   public static class TaskBatch {
625     public int installed = 0;
626     public int done = 0;
627     public int error = 0;
628     public volatile boolean isDead = false;
629 
630     @Override
631     public String toString() {
632       return ("installed = " + installed + " done = " + done + " error = " + error);
633     }
634   }
635 
636   
637 
638 
639   @InterfaceAudience.Private
640   public static class Task {
641     public volatile long last_update;
642     public volatile int last_version;
643     public volatile ServerName cur_worker_name;
644     public volatile TaskBatch batch;
645     public volatile TerminationStatus status;
646     public volatile AtomicInteger incarnation = new AtomicInteger(0);
647     public final AtomicInteger unforcedResubmits = new AtomicInteger();
648     public volatile boolean resubmitThresholdReached;
649 
650     @Override
651     public String toString() {
652       return ("last_update = " + last_update + " last_version = " + last_version
653           + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
654           + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
655     }
656 
657     public Task() {
658       last_version = -1;
659       status = IN_PROGRESS;
660       setUnassigned();
661     }
662 
663     public boolean isOrphan() {
664       return (batch == null || batch.isDead);
665     }
666 
667     public boolean isUnassigned() {
668       return (cur_worker_name == null);
669     }
670 
671     public void heartbeatNoDetails(long time) {
672       last_update = time;
673     }
674 
675     public void heartbeat(long time, int version, ServerName worker) {
676       last_version = version;
677       last_update = time;
678       cur_worker_name = worker;
679     }
680 
681     public void setUnassigned() {
682       cur_worker_name = null;
683       last_update = -1;
684     }
685   }
686 
687   
688 
689 
690   private class TimeoutMonitor extends ScheduledChore {
691     private long lastLog = 0;
692 
693     public TimeoutMonitor(final int period, Stoppable stopper) {
694       super("SplitLogManager Timeout Monitor", stopper, period);
695     }
696 
697     @Override
698     protected void chore() {
699       int resubmitted = 0;
700       int unassigned = 0;
701       int tot = 0;
702       boolean found_assigned_task = false;
703       Set<ServerName> localDeadWorkers;
704 
705       synchronized (deadWorkersLock) {
706         localDeadWorkers = deadWorkers;
707         deadWorkers = null;
708       }
709 
710       for (Map.Entry<String, Task> e : tasks.entrySet()) {
711         String path = e.getKey();
712         Task task = e.getValue();
713         ServerName cur_worker = task.cur_worker_name;
714         tot++;
715         
716         
717         
718         
719         
720         if (task.isUnassigned()) {
721           unassigned++;
722           continue;
723         }
724         found_assigned_task = true;
725         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
726           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
727           if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
728               .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
729             resubmitted++;
730           } else {
731             handleDeadWorker(cur_worker);
732             LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
733                 + ", will retry.");
734           }
735         } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
736             .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
737           resubmitted++;
738         }
739       }
740       if (tot > 0) {
741         long now = EnvironmentEdgeManager.currentTime();
742         if (now > lastLog + 5000) {
743           lastLog = now;
744           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
745         }
746       }
747       if (resubmitted > 0) {
748         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
749       }
750       
751       
752       
753       
754       
755       
756       
757       
758       
759       if (tot > 0
760           && !found_assigned_task
761           && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
762         for (Map.Entry<String, Task> e : tasks.entrySet()) {
763           String key = e.getKey();
764           Task task = e.getValue();
765           
766           
767           
768           
769           if (task.isUnassigned() && (task.status != FAILURE)) {
770             
771             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
772                 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
773           }
774         }
775         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
776             .getSplitLogManagerCoordination().checkTasks();
777         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
778         LOG.debug("resubmitting unassigned task(s) after timeout");
779       }
780       Set<String> failedDeletions =
781           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
782               .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
783       
784       if (failedDeletions.size() > 0) {
785         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
786         for (String tmpPath : tmpPaths) {
787           
788           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
789               .getSplitLogManagerCoordination().deleteTask(tmpPath);
790         }
791         failedDeletions.removeAll(tmpPaths);
792       }
793 
794       
795       long timeInterval =
796           EnvironmentEdgeManager.currentTime()
797               - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
798                   .getSplitLogManagerCoordination().getLastRecoveryTime();
799       if (!failedRecoveringRegionDeletions.isEmpty()
800           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
801         
802         if (!failedRecoveringRegionDeletions.isEmpty()) {
803           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
804               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
805           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
806           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
807             removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
808           }
809         } else {
810           removeRecoveringRegions(null, null);
811         }
812       }
813     }
814   }
815 
816   public enum ResubmitDirective {
817     CHECK(), FORCE();
818   }
819 
820   public enum TerminationStatus {
821     IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
822 
823     String statusMsg;
824 
825     TerminationStatus(String msg) {
826       statusMsg = msg;
827     }
828 
829     @Override
830     public String toString() {
831       return statusMsg;
832     }
833   }
834 }