1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.master;
19
20 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21 import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25 import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26
27 import java.io.IOException;
28 import java.io.InterruptedIOException;
29 import java.util.ArrayList;
30 import java.util.Arrays;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.locks.ReentrantLock;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.conf.Configuration;
44 import org.apache.hadoop.fs.FileStatus;
45 import org.apache.hadoop.fs.FileSystem;
46 import org.apache.hadoop.fs.Path;
47 import org.apache.hadoop.fs.PathFilter;
48 import org.apache.hadoop.hbase.ChoreService;
49 import org.apache.hadoop.hbase.HRegionInfo;
50 import org.apache.hadoop.hbase.ScheduledChore;
51 import org.apache.hadoop.hbase.Server;
52 import org.apache.hadoop.hbase.ServerName;
53 import org.apache.hadoop.hbase.SplitLogCounters;
54 import org.apache.hadoop.hbase.Stoppable;
55 import org.apache.hadoop.hbase.classification.InterfaceAudience;
56 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
57 import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
58 import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
59 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
61 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63 import org.apache.hadoop.hbase.util.FSUtils;
64 import org.apache.hadoop.hbase.util.Pair;
65 import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66 import org.apache.hadoop.hbase.wal.WALFactory;
67
68 import com.google.common.annotations.VisibleForTesting;
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99 @InterfaceAudience.Private
100 public class SplitLogManager {
101 private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
102
103 private Server server;
104
105 private final Stoppable stopper;
106 private final Configuration conf;
107 private final ChoreService choreService;
108
109 public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000);
110
111 private long unassignedTimeout;
112 private long lastTaskCreateTime = Long.MAX_VALUE;
113 private long checkRecoveringTimeThreshold = 15000;
114 private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
115 .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
116
117
118
119
120
121 protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
122
123 private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
124 private TimeoutMonitor timeoutMonitor;
125
126 private volatile Set<ServerName> deadWorkers = null;
127 private final Object deadWorkersLock = new Object();
128
129
130
131
132
133
134
135
136
137
138
139 public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
140 MasterServices master, ServerName serverName) throws IOException {
141 this.server = server;
142 this.conf = conf;
143 this.stopper = stopper;
144 this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_");
145 if (server.getCoordinatedStateManager() != null) {
146 SplitLogManagerCoordination coordination =
147 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
148 .getSplitLogManagerCoordination();
149 Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
150 SplitLogManagerDetails details =
151 new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
152 coordination.setDetails(details);
153 coordination.init();
154
155 }
156 this.unassignedTimeout =
157 conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
158 this.timeoutMonitor =
159 new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
160 stopper);
161 choreService.scheduleChore(timeoutMonitor);
162 }
163
164 private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
165 return getFileList(conf, logDirs, filter);
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179 @VisibleForTesting
180 public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
181 final PathFilter filter)
182 throws IOException {
183 List<FileStatus> fileStatus = new ArrayList<FileStatus>();
184 for (Path logDir : logDirs) {
185 final FileSystem fs = logDir.getFileSystem(conf);
186 if (!fs.exists(logDir)) {
187 LOG.warn(logDir + " doesn't exist. Nothing to do!");
188 continue;
189 }
190 FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
191 if (logfiles == null || logfiles.length == 0) {
192 LOG.info(logDir + " is empty dir, no logs to split");
193 } else {
194 Collections.addAll(fileStatus, logfiles);
195 }
196 }
197 FileStatus[] a = new FileStatus[fileStatus.size()];
198 return fileStatus.toArray(a);
199 }
200
201
202
203
204
205
206
207 public long splitLogDistributed(final Path logDir) throws IOException {
208 List<Path> logDirs = new ArrayList<Path>();
209 logDirs.add(logDir);
210 return splitLogDistributed(logDirs);
211 }
212
213
214
215
216
217
218
219
220
221 public long splitLogDistributed(final List<Path> logDirs) throws IOException {
222 if (logDirs.isEmpty()) {
223 return 0;
224 }
225 Set<ServerName> serverNames = new HashSet<ServerName>();
226 for (Path logDir : logDirs) {
227 try {
228 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
229 if (serverName != null) {
230 serverNames.add(serverName);
231 }
232 } catch (IllegalArgumentException e) {
233
234 LOG.warn("Cannot parse server name from " + logDir);
235 }
236 }
237 return splitLogDistributed(serverNames, logDirs, null);
238 }
239
240
241
242
243
244
245
246
247
248
249 public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
250 PathFilter filter) throws IOException {
251 MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
252 logDirs + " for serverName=" + serverNames);
253 FileStatus[] logfiles = getFileList(logDirs, filter);
254 status.setStatus("Checking directory contents...");
255 SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
256 LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
257 " for " + serverNames);
258 long t = EnvironmentEdgeManager.currentTime();
259 long totalSize = 0;
260 TaskBatch batch = new TaskBatch();
261 Boolean isMetaRecovery = (filter == null) ? null : false;
262 for (FileStatus lf : logfiles) {
263
264
265
266
267
268 totalSize += lf.getLen();
269 String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
270 if (!enqueueSplitTask(pathToLog, batch)) {
271 throw new IOException("duplicate log split scheduled for " + lf.getPath());
272 }
273 }
274 waitForSplittingCompletion(batch, status);
275
276 if (filter == MasterFileSystem.META_FILTER
277
278
279 isMetaRecovery = true;
280 }
281 removeRecoveringRegions(serverNames, isMetaRecovery);
282
283 if (batch.done != batch.installed) {
284 batch.isDead = true;
285 SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
286 LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
287 + " but only " + batch.done + " done");
288 String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
289 status.abort(msg);
290 throw new IOException(msg);
291 }
292 for (Path logDir : logDirs) {
293 status.setStatus("Cleaning up log directory...");
294 final FileSystem fs = logDir.getFileSystem(conf);
295 try {
296 if (fs.exists(logDir) && !fs.delete(logDir, false)) {
297 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
298 }
299 } catch (IOException ioe) {
300 FileStatus[] files = fs.listStatus(logDir);
301 if (files != null && files.length > 0) {
302 LOG.warn("Returning success without actually splitting and "
303 + "deleting all the log files in path " + logDir + ": "
304 + Arrays.toString(files), ioe);
305 } else {
306 LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
307 }
308 }
309 SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
310 }
311 String msg =
312 "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
313 + " log files in " + logDirs + " in "
314 + (EnvironmentEdgeManager.currentTime() - t) + "ms";
315 status.markComplete(msg);
316 LOG.info(msg);
317 return totalSize;
318 }
319
320
321
322
323
324
325
326 boolean enqueueSplitTask(String taskname, TaskBatch batch) {
327 lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
328 String task =
329 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
330 .getSplitLogManagerCoordination().prepareTask(taskname);
331 Task oldtask = createTaskIfAbsent(task, batch);
332 if (oldtask == null) {
333
334 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
335 .getSplitLogManagerCoordination().submitTask(task);
336 return true;
337 }
338 return false;
339 }
340
341 private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
342 synchronized (batch) {
343 while ((batch.done + batch.error) != batch.installed) {
344 try {
345 status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
346 + batch.installed + " done=" + batch.done + " error=" + batch.error);
347 int remaining = batch.installed - (batch.done + batch.error);
348 int actual = activeTasks(batch);
349 if (remaining != actual) {
350 LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
351 }
352 int remainingTasks =
353 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
354 .getSplitLogManagerCoordination().remainingTasksInCoordination();
355 if (remainingTasks >= 0 && actual > remainingTasks) {
356 LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
357 + remainingTasks);
358 }
359 if (remainingTasks == 0 || actual == 0) {
360 LOG.warn("No more task remaining, splitting "
361 + "should have completed. Remaining tasks is " + remainingTasks
362 + ", active tasks in map " + actual);
363 if (remainingTasks == 0 && actual == 0) {
364 return;
365 }
366 }
367 batch.wait(100);
368 if (stopper.isStopped()) {
369 LOG.warn("Stopped while waiting for log splits to be completed");
370 return;
371 }
372 } catch (InterruptedException e) {
373 LOG.warn("Interrupted while waiting for log splits to be completed");
374 Thread.currentThread().interrupt();
375 return;
376 }
377 }
378 }
379 }
380
381 @VisibleForTesting
382 ConcurrentMap<String, Task> getTasks() {
383 return tasks;
384 }
385
386 private int activeTasks(final TaskBatch batch) {
387 int count = 0;
388 for (Task t : tasks.values()) {
389 if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
390 count++;
391 }
392 }
393 return count;
394
395 }
396
397
398
399
400
401
402
403
404 private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
405 if (!isLogReplaying()) {
406
407 return;
408 }
409 if (serverNames == null || serverNames.isEmpty()) return;
410
411 Set<String> recoveredServerNameSet = new HashSet<String>();
412 for (ServerName tmpServerName : serverNames) {
413 recoveredServerNameSet.add(tmpServerName.getServerName());
414 }
415
416 this.recoveringRegionLock.lock();
417 try {
418 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
419 .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
420 isMetaRecovery);
421 } catch (IOException e) {
422 LOG.warn("removeRecoveringRegions got exception. Will retry", e);
423 if (serverNames != null && !serverNames.isEmpty()) {
424 this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
425 isMetaRecovery));
426 }
427 } finally {
428 this.recoveringRegionLock.unlock();
429 }
430 }
431
432
433
434
435
436
437
438 void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
439 InterruptedIOException {
440 Set<String> knownFailedServers = new HashSet<String>();
441 if (failedServers != null) {
442 for (ServerName tmpServerName : failedServers) {
443 knownFailedServers.add(tmpServerName.getServerName());
444 }
445 }
446
447 this.recoveringRegionLock.lock();
448 try {
449 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
450 .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
451 } finally {
452 this.recoveringRegionLock.unlock();
453 }
454 }
455
456
457
458
459
460
461 private Task createTaskIfAbsent(String path, TaskBatch batch) {
462 Task oldtask;
463
464
465 Task newtask = new Task();
466 newtask.batch = batch;
467 oldtask = tasks.putIfAbsent(path, newtask);
468 if (oldtask == null) {
469 batch.installed++;
470 return null;
471 }
472
473 synchronized (oldtask) {
474 if (oldtask.isOrphan()) {
475 if (oldtask.status == SUCCESS) {
476
477
478
479
480 return (null);
481 }
482 if (oldtask.status == IN_PROGRESS) {
483 oldtask.batch = batch;
484 batch.installed++;
485 LOG.debug("Previously orphan task " + path + " is now being waited upon");
486 return null;
487 }
488 while (oldtask.status == FAILURE) {
489 LOG.debug("wait for status of task " + path + " to change to DELETED");
490 SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
491 try {
492 oldtask.wait();
493 } catch (InterruptedException e) {
494 Thread.currentThread().interrupt();
495 LOG.warn("Interrupted when waiting for znode delete callback");
496
497 break;
498 }
499 }
500 if (oldtask.status != DELETED) {
501 LOG.warn("Failure because previously failed task"
502 + " state still present. Waiting for znode delete callback" + " path=" + path);
503 return oldtask;
504 }
505
506 Task t = tasks.putIfAbsent(path, newtask);
507 if (t == null) {
508 batch.installed++;
509 return null;
510 }
511 LOG.fatal("Logic error. Deleted task still present in tasks map");
512 assert false : "Deleted task still present in tasks map";
513 return t;
514 }
515 LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
516 return oldtask;
517 }
518 }
519
520 Task findOrCreateOrphanTask(String path) {
521 Task orphanTask = new Task();
522 Task task;
523 task = tasks.putIfAbsent(path, orphanTask);
524 if (task == null) {
525 LOG.info("creating orphan task " + path);
526 SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
527 task = orphanTask;
528 }
529 return task;
530 }
531
532 public void stop() {
533 if (choreService != null) {
534 choreService.shutdown();
535 }
536 if (timeoutMonitor != null) {
537 timeoutMonitor.cancel(true);
538 }
539 }
540
541 void handleDeadWorker(ServerName workerName) {
542
543
544 synchronized (deadWorkersLock) {
545 if (deadWorkers == null) {
546 deadWorkers = new HashSet<ServerName>(100);
547 }
548 deadWorkers.add(workerName);
549 }
550 LOG.info("dead splitlog worker " + workerName);
551 }
552
553 void handleDeadWorkers(Set<ServerName> serverNames) {
554 synchronized (deadWorkersLock) {
555 if (deadWorkers == null) {
556 deadWorkers = new HashSet<ServerName>(100);
557 }
558 deadWorkers.addAll(serverNames);
559 }
560 LOG.info("dead splitlog workers " + serverNames);
561 }
562
563
564
565
566
567
568
569 public void setRecoveryMode(boolean isForInitialization) throws IOException {
570 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
571 .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
572
573 }
574
575 public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
576 throws InterruptedIOException, IOException {
577 if (userRegions == null || (!isLogReplaying())) {
578 return;
579 }
580 try {
581 this.recoveringRegionLock.lock();
582
583 ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
584 .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
585 } finally {
586 this.recoveringRegionLock.unlock();
587 }
588
589 }
590
591
592
593
594 public boolean isLogReplaying() {
595 if (server.getCoordinatedStateManager() == null) return false;
596 return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
597 .getSplitLogManagerCoordination().isReplaying();
598 }
599
600
601
602
603 public boolean isLogSplitting() {
604 if (server.getCoordinatedStateManager() == null) return false;
605 return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
606 .getSplitLogManagerCoordination().isSplitting();
607 }
608
609
610
611
612 public RecoveryMode getRecoveryMode() {
613 return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
614 .getSplitLogManagerCoordination().getRecoveryMode();
615 }
616
617
618
619
620
621
622
623 @InterfaceAudience.Private
624 public static class TaskBatch {
625 public int installed = 0;
626 public int done = 0;
627 public int error = 0;
628 public volatile boolean isDead = false;
629
630 @Override
631 public String toString() {
632 return ("installed = " + installed + " done = " + done + " error = " + error);
633 }
634 }
635
636
637
638
639 @InterfaceAudience.Private
640 public static class Task {
641 public volatile long last_update;
642 public volatile int last_version;
643 public volatile ServerName cur_worker_name;
644 public volatile TaskBatch batch;
645 public volatile TerminationStatus status;
646 public volatile AtomicInteger incarnation = new AtomicInteger(0);
647 public final AtomicInteger unforcedResubmits = new AtomicInteger();
648 public volatile boolean resubmitThresholdReached;
649
650 @Override
651 public String toString() {
652 return ("last_update = " + last_update + " last_version = " + last_version
653 + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
654 + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
655 }
656
657 public Task() {
658 last_version = -1;
659 status = IN_PROGRESS;
660 setUnassigned();
661 }
662
663 public boolean isOrphan() {
664 return (batch == null || batch.isDead);
665 }
666
667 public boolean isUnassigned() {
668 return (cur_worker_name == null);
669 }
670
671 public void heartbeatNoDetails(long time) {
672 last_update = time;
673 }
674
675 public void heartbeat(long time, int version, ServerName worker) {
676 last_version = version;
677 last_update = time;
678 cur_worker_name = worker;
679 }
680
681 public void setUnassigned() {
682 cur_worker_name = null;
683 last_update = -1;
684 }
685 }
686
687
688
689
690 private class TimeoutMonitor extends ScheduledChore {
691 private long lastLog = 0;
692
693 public TimeoutMonitor(final int period, Stoppable stopper) {
694 super("SplitLogManager Timeout Monitor", stopper, period);
695 }
696
697 @Override
698 protected void chore() {
699 int resubmitted = 0;
700 int unassigned = 0;
701 int tot = 0;
702 boolean found_assigned_task = false;
703 Set<ServerName> localDeadWorkers;
704
705 synchronized (deadWorkersLock) {
706 localDeadWorkers = deadWorkers;
707 deadWorkers = null;
708 }
709
710 for (Map.Entry<String, Task> e : tasks.entrySet()) {
711 String path = e.getKey();
712 Task task = e.getValue();
713 ServerName cur_worker = task.cur_worker_name;
714 tot++;
715
716
717
718
719
720 if (task.isUnassigned()) {
721 unassigned++;
722 continue;
723 }
724 found_assigned_task = true;
725 if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
726 SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
727 if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
728 .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
729 resubmitted++;
730 } else {
731 handleDeadWorker(cur_worker);
732 LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
733 + ", will retry.");
734 }
735 } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
736 .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
737 resubmitted++;
738 }
739 }
740 if (tot > 0) {
741 long now = EnvironmentEdgeManager.currentTime();
742 if (now > lastLog + 5000) {
743 lastLog = now;
744 LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
745 }
746 }
747 if (resubmitted > 0) {
748 LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
749 }
750
751
752
753
754
755
756
757
758
759 if (tot > 0
760 && !found_assigned_task
761 && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
762 for (Map.Entry<String, Task> e : tasks.entrySet()) {
763 String key = e.getKey();
764 Task task = e.getValue();
765
766
767
768
769 if (task.isUnassigned() && (task.status != FAILURE)) {
770
771 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
772 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
773 }
774 }
775 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
776 .getSplitLogManagerCoordination().checkTasks();
777 SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
778 LOG.debug("resubmitting unassigned task(s) after timeout");
779 }
780 Set<String> failedDeletions =
781 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
782 .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
783
784 if (failedDeletions.size() > 0) {
785 List<String> tmpPaths = new ArrayList<String>(failedDeletions);
786 for (String tmpPath : tmpPaths) {
787
788 ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
789 .getSplitLogManagerCoordination().deleteTask(tmpPath);
790 }
791 failedDeletions.removeAll(tmpPaths);
792 }
793
794
795 long timeInterval =
796 EnvironmentEdgeManager.currentTime()
797 - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
798 .getSplitLogManagerCoordination().getLastRecoveryTime();
799 if (!failedRecoveringRegionDeletions.isEmpty()
800 || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
801
802 if (!failedRecoveringRegionDeletions.isEmpty()) {
803 List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
804 new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
805 failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
806 for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
807 removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
808 }
809 } else {
810 removeRecoveringRegions(null, null);
811 }
812 }
813 }
814 }
815
816 public enum ResubmitDirective {
817 CHECK(), FORCE();
818 }
819
820 public enum TerminationStatus {
821 IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
822
823 String statusMsg;
824
825 TerminationStatus(String msg) {
826 statusMsg = msg;
827 }
828
829 @Override
830 public String toString() {
831 return statusMsg;
832 }
833 }
834 }