1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashSet;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.Set;
35 import java.util.concurrent.ConcurrentHashMap;
36 import java.util.concurrent.ConcurrentMap;
37 import java.util.concurrent.atomic.AtomicInteger;
38 import java.util.concurrent.locks.ReentrantLock;
39
40 import org.apache.commons.logging.Log;
41 import org.apache.commons.logging.LogFactory;
42 import org.apache.hadoop.conf.Configuration;
43 import org.apache.hadoop.fs.FileStatus;
44 import org.apache.hadoop.fs.FileSystem;
45 import org.apache.hadoop.fs.Path;
46 import org.apache.hadoop.fs.PathFilter;
47 import org.apache.hadoop.hbase.ChoreService;
48 import org.apache.hadoop.hbase.HRegionInfo;
49 import org.apache.hadoop.hbase.ScheduledChore;
50 import org.apache.hadoop.hbase.Server;
51 import org.apache.hadoop.hbase.ServerName;
52 import org.apache.hadoop.hbase.SplitLogCounters;
53 import org.apache.hadoop.hbase.Stoppable;
54 import org.apache.hadoop.hbase.classification.InterfaceAudience;
55 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
56 import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
57 import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
58 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62 import org.apache.hadoop.hbase.util.FSUtils;
63 import org.apache.hadoop.hbase.util.Pair;
64 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
65 import org.apache.hadoop.hbase.wal.WALFactory;
66
67 import com.google.common.annotations.VisibleForTesting;
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98 @InterfaceAudience.Private
99 public class SplitLogManager {
100 private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
101
102 private Server server;
103
104 private final Stoppable stopper;
105 private final Configuration conf;
106 private final ChoreService choreService;
107
108 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000);
109
110 private long unassignedTimeout;
111 private long lastTaskCreateTime = Long.MAX_VALUE;
112 private long checkRecoveringTimeThreshold = 15000;
113 private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
114 .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
115
116
117
118
119
120 protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
121
122 private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
123 private TimeoutMonitor timeoutMonitor;
124
125 private volatile Set<ServerName> deadWorkers = null;
126 private final Object deadWorkersLock = new Object();
127
128
129
130
131
132
133
134
135
136
137
138 public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
139 MasterServices master, ServerName serverName) throws IOException {
140 this.server = server;
141 this.conf = conf;
142 this.stopper = stopper;
143 this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_");
144 if (server.getCoordinatedStateManager() != null) {
145 SplitLogManagerCoordination coordination =
146 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
147 .getSplitLogManagerCoordination();
148 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
149 SplitLogManagerDetails details =
150 new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
151 coordination.setDetails(details);
152 coordination.init();
153
154 }
155 this.unassignedTimeout =
156 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
157 this.timeoutMonitor =
158 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
159 stopper);
160 choreService.scheduleChore(timeoutMonitor);
161 }
162
163 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
164 return getFileList(conf, logDirs, filter);
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178 @VisibleForTesting
179 public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
180 final PathFilter filter)
181 throws IOException {
182 List<FileStatus> fileStatus = new ArrayList<FileStatus>();
183 for (Path logDir : logDirs) {
184 final FileSystem fs = logDir.getFileSystem(conf);
185 if (!fs.exists(logDir)) {
186 LOG.warn(logDir + " doesn't exist. Nothing to do!");
187 continue;
188 }
189 FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
190 if (logfiles == null || logfiles.length == 0) {
191 LOG.info(logDir + " is empty dir, no logs to split");
192 } else {
193 Collections.addAll(fileStatus, logfiles);
194 }
195 }
196 FileStatus[] a = new FileStatus[fileStatus.size()];
197 return fileStatus.toArray(a);
198 }
199
200
201
202
203
204
205
206 public long splitLogDistributed(final Path logDir) throws IOException {
207 List<Path> logDirs = new ArrayList<Path>();
208 logDirs.add(logDir);
209 return splitLogDistributed(logDirs);
210 }
211
212
213
214
215
216
217
218
219
220 public long splitLogDistributed(final List<Path> logDirs) throws IOException {
221 if (logDirs.isEmpty()) {
222 return 0;
223 }
224 Set<ServerName> serverNames = new HashSet<ServerName>();
225 for (Path logDir : logDirs) {
226 try {
227 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
228 if (serverName != null) {
229 serverNames.add(serverName);
230 }
231 } catch (IllegalArgumentException e) {
232
233 LOG.warn("Cannot parse server name from " + logDir);
234 }
235 }
236 return splitLogDistributed(serverNames, logDirs, null);
237 }
238
239
240
241
242
243
244
245
246
247
248 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
249 PathFilter filter) throws IOException {
250 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
251 logDirs + " for serverName=" + serverNames);
252 FileStatus[] logfiles = getFileList(logDirs, filter);
253 status.setStatus("Checking directory contents...");
254 LOG.debug("Scheduling batch of logs to split");
255 SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
256 LOG.info("started splitting " + logfiles.length + " logs in " + logDirs +
257 " for " + serverNames);
258 long t = EnvironmentEdgeManager.currentTime();
259 long totalSize = 0;
260 TaskBatch batch = new TaskBatch();
261 Boolean isMetaRecovery = (filter == null) ? null : false;
262 for (FileStatus lf : logfiles) {
263
264
265
266
267
268 totalSize += lf.getLen();
269 String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
270 if (!enqueueSplitTask(pathToLog, batch)) {
271 throw new IOException("duplicate log split scheduled for " + lf.getPath());
272 }
273 }
274 waitForSplittingCompletion(batch, status);
275
276 if (filter == MasterFileSystem.META_FILTER
277
278
279 isMetaRecovery = true;
280 }
281 removeRecoveringRegions(serverNames, isMetaRecovery);
282
283 if (batch.done != batch.installed) {
284 batch.isDead = true;
285 SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
286 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
287 + " but only " + batch.done + " done");
288 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
289 status.abort(msg);
290 throw new IOException(msg);
291 }
292 for (Path logDir : logDirs) {
293 status.setStatus("Cleaning up log directory...");
294 final FileSystem fs = logDir.getFileSystem(conf);
295 try {
296 if (fs.exists(logDir) && !fs.delete(logDir, false)) {
297 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
298 }
299 } catch (IOException ioe) {
300 FileStatus[] files = fs.listStatus(logDir);
301 if (files != null && files.length > 0) {
302 LOG.warn("returning success without actually splitting and "
303 + "deleting all the log files in path " + logDir);
304 } else {
305 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
306 }
307 }
308 SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
309 }
310 String msg =
311 "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
312 + " log files in " + logDirs + " in "
313 + (EnvironmentEdgeManager.currentTime() - t) + "ms";
314 status.markComplete(msg);
315 LOG.info(msg);
316 return totalSize;
317 }
318
319
320
321
322
323
324
325 boolean enqueueSplitTask(String taskname, TaskBatch batch) {
326 lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
327 String task =
328 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
329 .getSplitLogManagerCoordination().prepareTask(taskname);
330 Task oldtask = createTaskIfAbsent(task, batch);
331 if (oldtask == null) {
332
333 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
334 .getSplitLogManagerCoordination().submitTask(task);
335 return true;
336 }
337 return false;
338 }
339
340 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
341 synchronized (batch) {
342 while ((batch.done + batch.error) != batch.installed) {
343 try {
344 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
345 + batch.installed + " done=" + batch.done + " error=" + batch.error);
346 int remaining = batch.installed - (batch.done + batch.error);
347 int actual = activeTasks(batch);
348 if (remaining != actual) {
349 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
350 }
351 int remainingTasks =
352 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
353 .getSplitLogManagerCoordination().remainingTasksInCoordination();
354 if (remainingTasks >= 0 && actual > remainingTasks) {
355 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
356 + remainingTasks);
357 }
358 if (remainingTasks == 0 || actual == 0) {
359 LOG.warn("No more task remaining, splitting "
360 + "should have completed. Remaining tasks is " + remainingTasks
361 + ", active tasks in map " + actual);
362 if (remainingTasks == 0 && actual == 0) {
363 return;
364 }
365 }
366 batch.wait(100);
367 if (stopper.isStopped()) {
368 LOG.warn("Stopped while waiting for log splits to be completed");
369 return;
370 }
371 } catch (InterruptedException e) {
372 LOG.warn("Interrupted while waiting for log splits to be completed");
373 Thread.currentThread().interrupt();
374 return;
375 }
376 }
377 }
378 }
379
380 @VisibleForTesting
381 ConcurrentMap<String, Task> getTasks() {
382 return tasks;
383 }
384
385 private int activeTasks(final TaskBatch batch) {
386 int count = 0;
387 for (Task t : tasks.values()) {
388 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
389 count++;
390 }
391 }
392 return count;
393
394 }
395
396
397
398
399
400
401
402
403 private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
404 if (!isLogReplaying()) {
405
406 return;
407 }
408
409 Set<String> recoveredServerNameSet = new HashSet<String>();
410 if (serverNames != null) {
411 for (ServerName tmpServerName : serverNames) {
412 recoveredServerNameSet.add(tmpServerName.getServerName());
413 }
414 }
415
416 try {
417 this.recoveringRegionLock.lock();
418 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
419 .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
420 isMetaRecovery);
421 } catch (IOException e) {
422 LOG.warn("removeRecoveringRegions got exception. Will retry", e);
423 if (serverNames != null && !serverNames.isEmpty()) {
424 this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
425 isMetaRecovery));
426 }
427 } finally {
428 this.recoveringRegionLock.unlock();
429 }
430 }
431
432
433
434
435
436
437
438 void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
439 InterruptedIOException {
440 Set<String> knownFailedServers = new HashSet<String>();
441 if (failedServers != null) {
442 for (ServerName tmpServerName : failedServers) {
443 knownFailedServers.add(tmpServerName.getServerName());
444 }
445 }
446
447 this.recoveringRegionLock.lock();
448 try {
449 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
450 .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
451 } finally {
452 this.recoveringRegionLock.unlock();
453 }
454 }
455
456
457
458
459
460
461 private Task createTaskIfAbsent(String path, TaskBatch batch) {
462 Task oldtask;
463
464
465 Task newtask = new Task();
466 newtask.batch = batch;
467 oldtask = tasks.putIfAbsent(path, newtask);
468 if (oldtask == null) {
469 batch.installed++;
470 return null;
471 }
472
473 synchronized (oldtask) {
474 if (oldtask.isOrphan()) {
475 if (oldtask.status == SUCCESS) {
476
477
478
479
480 return (null);
481 }
482 if (oldtask.status == IN_PROGRESS) {
483 oldtask.batch = batch;
484 batch.installed++;
485 LOG.debug("Previously orphan task " + path + " is now being waited upon");
486 return null;
487 }
488 while (oldtask.status == FAILURE) {
489 LOG.debug("wait for status of task " + path + " to change to DELETED");
490 SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
491 try {
492 oldtask.wait();
493 } catch (InterruptedException e) {
494 Thread.currentThread().interrupt();
495 LOG.warn("Interrupted when waiting for znode delete callback");
496
497 break;
498 }
499 }
500 if (oldtask.status != DELETED) {
501 LOG.warn("Failure because previously failed task"
502 + " state still present. Waiting for znode delete callback" + " path=" + path);
503 return oldtask;
504 }
505
506 Task t = tasks.putIfAbsent(path, newtask);
507 if (t == null) {
508 batch.installed++;
509 return null;
510 }
511 LOG.fatal("Logic error. Deleted task still present in tasks map");
512 assert false : "Deleted task still present in tasks map";
513 return t;
514 }
515 LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
516 return oldtask;
517 }
518 }
519
520 Task findOrCreateOrphanTask(String path) {
521 Task orphanTask = new Task();
522 Task task;
523 task = tasks.putIfAbsent(path, orphanTask);
524 if (task == null) {
525 LOG.info("creating orphan task " + path);
526 SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
527 task = orphanTask;
528 }
529 return task;
530 }
531
532 public void stop() {
533 if (choreService != null) {
534 choreService.shutdown();
535 }
536 if (timeoutMonitor != null) {
537 timeoutMonitor.cancel(true);
538 }
539 }
540
541 void handleDeadWorker(ServerName workerName) {
542
543
544 synchronized (deadWorkersLock) {
545 if (deadWorkers == null) {
546 deadWorkers = new HashSet<ServerName>(100);
547 }
548 deadWorkers.add(workerName);
549 }
550 LOG.info("dead splitlog worker " + workerName);
551 }
552
553 void handleDeadWorkers(Set<ServerName> serverNames) {
554 synchronized (deadWorkersLock) {
555 if (deadWorkers == null) {
556 deadWorkers = new HashSet<ServerName>(100);
557 }
558 deadWorkers.addAll(serverNames);
559 }
560 LOG.info("dead splitlog workers " + serverNames);
561 }
562
563
564
565
566
567
568
569 public void setRecoveryMode(boolean isForInitialization) throws IOException {
570 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
571 .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
572
573 }
574
575 public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
576 throws InterruptedIOException, IOException {
577 if (userRegions == null || (!isLogReplaying())) {
578 return;
579 }
580 try {
581 this.recoveringRegionLock.lock();
582
583 ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
584 .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
585 } finally {
586 this.recoveringRegionLock.unlock();
587 }
588
589 }
590
591
592
593
594 public boolean isLogReplaying() {
595 if (server.getCoordinatedStateManager() == null) return false;
596 return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
597 .getSplitLogManagerCoordination().isReplaying();
598 }
599
600
601
602
603 public boolean isLogSplitting() {
604 if (server.getCoordinatedStateManager() == null) return false;
605 return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
606 .getSplitLogManagerCoordination().isSplitting();
607 }
608
609
610
611
612 public RecoveryMode getRecoveryMode() {
613 return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
614 .getSplitLogManagerCoordination().getRecoveryMode();
615 }
616
617
618
619
620
621
622
623 @InterfaceAudience.Private
624 public static class TaskBatch {
625 public int installed = 0;
626 public int done = 0;
627 public int error = 0;
628 public volatile boolean isDead = false;
629
630 @Override
631 public String toString() {
632 return ("installed = " + installed + " done = " + done + " error = " + error);
633 }
634 }
635
636
637
638
639 @InterfaceAudience.Private
640 public static class Task {
641 public volatile long last_update;
642 public volatile int last_version;
643 public volatile ServerName cur_worker_name;
644 public volatile TaskBatch batch;
645 public volatile TerminationStatus status;
646 public volatile int incarnation;
647 public final AtomicInteger unforcedResubmits = new AtomicInteger();
648 public volatile boolean resubmitThresholdReached;
649
650 @Override
651 public String toString() {
652 return ("last_update = " + last_update + " last_version = " + last_version
653 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
654 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
655 }
656
657 public Task() {
658 incarnation = 0;
659 last_version = -1;
660 status = IN_PROGRESS;
661 setUnassigned();
662 }
663
664 public boolean isOrphan() {
665 return (batch == null || batch.isDead);
666 }
667
668 public boolean isUnassigned() {
669 return (cur_worker_name == null);
670 }
671
672 public void heartbeatNoDetails(long time) {
673 last_update = time;
674 }
675
676 public void heartbeat(long time, int version, ServerName worker) {
677 last_version = version;
678 last_update = time;
679 cur_worker_name = worker;
680 }
681
682 public void setUnassigned() {
683 cur_worker_name = null;
684 last_update = -1;
685 }
686 }
687
688
689
690
691 private class TimeoutMonitor extends ScheduledChore {
692 private long lastLog = 0;
693
694 public TimeoutMonitor(final int period, Stoppable stopper) {
695 super("SplitLogManager Timeout Monitor", stopper, period);
696 }
697
698 @Override
699 protected void chore() {
700 int resubmitted = 0;
701 int unassigned = 0;
702 int tot = 0;
703 boolean found_assigned_task = false;
704 Set<ServerName> localDeadWorkers;
705
706 synchronized (deadWorkersLock) {
707 localDeadWorkers = deadWorkers;
708 deadWorkers = null;
709 }
710
711 for (Map.Entry<String, Task> e : tasks.entrySet()) {
712 String path = e.getKey();
713 Task task = e.getValue();
714 ServerName cur_worker = task.cur_worker_name;
715 tot++;
716
717
718
719
720
721 if (task.isUnassigned()) {
722 unassigned++;
723 continue;
724 }
725 found_assigned_task = true;
726 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
727 SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
728 if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
729 .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
730 resubmitted++;
731 } else {
732 handleDeadWorker(cur_worker);
733 LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
734 + ", will retry.");
735 }
736 } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
737 .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
738 resubmitted++;
739 }
740 }
741 if (tot > 0) {
742 long now = EnvironmentEdgeManager.currentTime();
743 if (now > lastLog + 5000) {
744 lastLog = now;
745 LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
746 }
747 }
748 if (resubmitted > 0) {
749 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
750 }
751
752
753
754
755
756
757
758
759
760 if (tot > 0
761 && !found_assigned_task
762 && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
763 for (Map.Entry<String, Task> e : tasks.entrySet()) {
764 String key = e.getKey();
765 Task task = e.getValue();
766
767
768
769
770 if (task.isUnassigned() && (task.status != FAILURE)) {
771
772 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
773 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
774 }
775 }
776 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
777 .getSplitLogManagerCoordination().checkTasks();
778 SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
779 LOG.debug("resubmitting unassigned task(s) after timeout");
780 }
781 Set<String> failedDeletions =
782 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
783 .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
784
785 if (failedDeletions.size() > 0) {
786 List<String> tmpPaths = new ArrayList<String>(failedDeletions);
787 for (String tmpPath : tmpPaths) {
788
789 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
790 .getSplitLogManagerCoordination().deleteTask(tmpPath);
791 }
792 failedDeletions.removeAll(tmpPaths);
793 }
794
795
796 long timeInterval =
797 EnvironmentEdgeManager.currentTime()
798 - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
799 .getSplitLogManagerCoordination().getLastRecoveryTime();
800 if (!failedRecoveringRegionDeletions.isEmpty()
801 || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
802
803 if (!failedRecoveringRegionDeletions.isEmpty()) {
804 List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
805 new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
806 failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
807 for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
808 removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
809 }
810 } else {
811 removeRecoveringRegions(null, null);
812 }
813 }
814 }
815 }
816
817 public enum ResubmitDirective {
818 CHECK(), FORCE();
819 }
820
821 public enum TerminationStatus {
822 IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
823
824 String statusMsg;
825
826 TerminationStatus(String msg) {
827 statusMsg = msg;
828 }
829
830 @Override
831 public String toString() {
832 return statusMsg;
833 }
834 }
835 }