View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.Server;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.SplitLogCounters;
53  import org.apache.hadoop.hbase.Stoppable;
54  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
55  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
56  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
57  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
58  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
59  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
60  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
61  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
62  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63  import org.apache.hadoop.hbase.util.FSUtils;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.hbase.util.Threads;
66  
67  import com.google.common.annotations.VisibleForTesting;
68  
69  /**
70   * Distributes the task of log splitting to the available region servers.
71   * Coordination happens via coordination engine. For every log file that has to be split a
72   * task is created. SplitLogWorkers race to grab a task.
73   *
74   * <p>SplitLogManager monitors the tasks that it creates using the
75   * timeoutMonitor thread. If a task's progress is slow then
76   * {@link SplitLogManagerCoordination#checkTasks} will take away the
77   * task from the owner {@link SplitLogWorker} and the task will be up for grabs again. When the
78   * task is done then it is deleted by SplitLogManager.
79   *
80   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
81   * log files. The caller thread waits in this method until all the log files
82   * have been split.
83   *
84   * <p>All the coordination calls made by this class are asynchronous. This is mainly
85   * to help reduce response time seen by the callers.
86   *
87   * <p>There is race in this design between the SplitLogManager and the
88   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
89   * already been completed by a SplitLogWorker. We rely on the idempotency of
90   * the log splitting task for correctness.
91   *
92   * <p>It is also assumed that every log splitting task is unique and once
93   * completed (either with success or with error) it will be not be submitted
94   * again. If a task is resubmitted then there is a risk that old "delete task"
95   * can delete the re-submission.
96   */
97  @InterfaceAudience.Private
98  public class SplitLogManager {
99    private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
100 
101   private Server server;
102 
103   private final Stoppable stopper;
104   private FileSystem fs;
105   private Configuration conf;
106 
107   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
108 
109   private long unassignedTimeout;
110   private long lastTaskCreateTime = Long.MAX_VALUE;
111   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
112   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
113       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
114 
115   /**
116    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
117    * operation. So the lock is used to guard such cases.
118    */
119   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
120 
121   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
122   private TimeoutMonitor timeoutMonitor;
123 
124   private volatile Set<ServerName> deadWorkers = null;
125   private final Object deadWorkersLock = new Object();
126 
127   /**
128    * Its OK to construct this object even when region-servers are not online. It does lookup the
129    * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
130    * @param server the server instance
131    * @param conf the HBase configuration
132    * @param stopper the stoppable in case anything is wrong
133    * @param master the master services
134    * @param serverName the master server name
135    * @throws IOException
136    */
137   public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
138       MasterServices master, ServerName serverName) throws IOException {
139     this.server = server;
140     this.conf = conf;
141     this.stopper = stopper;
142     if (server.getCoordinatedStateManager() != null) {
143       SplitLogManagerCoordination coordination =
144           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
145               .getSplitLogManagerCoordination();
146       Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
147       SplitLogManagerDetails details =
148           new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
149       coordination.init();
150       coordination.setDetails(details);
151       // Determine recovery mode
152     }
153     this.unassignedTimeout =
154         conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
155     this.timeoutMonitor =
156         new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
157             stopper);
158     Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
159         + ".splitLogManagerTimeoutMonitor");
160   }
161 
162   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
163     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
164     for (Path hLogDir : logDirs) {
165       this.fs = hLogDir.getFileSystem(conf);
166       if (!fs.exists(hLogDir)) {
167         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
168         continue;
169       }
170       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
171       if (logfiles == null || logfiles.length == 0) {
172         LOG.info(hLogDir + " is empty dir, no logs to split");
173       } else {
174         for (FileStatus status : logfiles)
175           fileStatus.add(status);
176       }
177     }
178     FileStatus[] a = new FileStatus[fileStatus.size()];
179     return fileStatus.toArray(a);
180   }
181 
182   /**
183    * @param logDir one region sever hlog dir path in .logs
184    * @throws IOException if there was an error while splitting any log file
185    * @return cumulative size of the logfiles split
186    * @throws IOException
187    */
188   public long splitLogDistributed(final Path logDir) throws IOException {
189     List<Path> logDirs = new ArrayList<Path>();
190     logDirs.add(logDir);
191     return splitLogDistributed(logDirs);
192   }
193 
194   /**
195    * The caller will block until all the log files of the given region server have been processed -
196    * successfully split or an error is encountered - by an available worker region server. This
197    * method must only be called after the region servers have been brought online.
198    * @param logDirs List of log dirs to split
199    * @throws IOException If there was an error while splitting any log file
200    * @return cumulative size of the logfiles split
201    */
202   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
203     if (logDirs.isEmpty()) {
204       return 0;
205     }
206     Set<ServerName> serverNames = new HashSet<ServerName>();
207     for (Path logDir : logDirs) {
208       try {
209         ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
210         if (serverName != null) {
211           serverNames.add(serverName);
212         }
213       } catch (IllegalArgumentException e) {
214         // ignore invalid format error.
215         LOG.warn("Cannot parse server name from " + logDir);
216       }
217     }
218     return splitLogDistributed(serverNames, logDirs, null);
219   }
220 
221   /**
222    * The caller will block until all the hbase:meta log files of the given region server have been
223    * processed - successfully split or an error is encountered - by an available worker region
224    * server. This method must only be called after the region servers have been brought online.
225    * @param logDirs List of log dirs to split
226    * @param filter the Path filter to select specific files for considering
227    * @throws IOException If there was an error while splitting any log file
228    * @return cumulative size of the logfiles split
229    */
230   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
231       PathFilter filter) throws IOException {
232     MonitoredTask status =
233         TaskMonitor.get().createStatus("Doing distributed log split in " + logDirs);
234     FileStatus[] logfiles = getFileList(logDirs, filter);
235     status.setStatus("Checking directory contents...");
236     LOG.debug("Scheduling batch of logs to split");
237     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
238     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
239     long t = EnvironmentEdgeManager.currentTime();
240     long totalSize = 0;
241     TaskBatch batch = new TaskBatch();
242     Boolean isMetaRecovery = (filter == null) ? null : false;
243     for (FileStatus lf : logfiles) {
244       // TODO If the log file is still being written to - which is most likely
245       // the case for the last log file - then its length will show up here
246       // as zero. The size of such a file can only be retrieved after
247       // recover-lease is done. totalSize will be under in most cases and the
248       // metrics that it drives will also be under-reported.
249       totalSize += lf.getLen();
250       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
251       if (!enqueueSplitTask(pathToLog, batch)) {
252         throw new IOException("duplicate log split scheduled for " + lf.getPath());
253       }
254     }
255     waitForSplittingCompletion(batch, status);
256     // remove recovering regions
257     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
258       // we split meta regions and user regions separately therefore logfiles are either all for
259       // meta or user regions but won't for both( we could have mixed situations in tests)
260       isMetaRecovery = true;
261     }
262     removeRecoveringRegions(serverNames, isMetaRecovery);
263 
264     if (batch.done != batch.installed) {
265       batch.isDead = true;
266       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
267       LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
268           + " but only " + batch.done + " done");
269       String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
270       status.abort(msg);
271       throw new IOException(msg);
272     }
273     for (Path logDir : logDirs) {
274       status.setStatus("Cleaning up log directory...");
275       try {
276         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
277           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
278         }
279       } catch (IOException ioe) {
280         FileStatus[] files = fs.listStatus(logDir);
281         if (files != null && files.length > 0) {
282           LOG.warn("returning success without actually splitting and "
283               + "deleting all the log files in path " + logDir);
284         } else {
285           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
286         }
287       }
288       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
289     }
290     String msg =
291         "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
292             + " log files in " + logDirs + " in "
293             + (EnvironmentEdgeManager.currentTime() - t) + "ms";
294     status.markComplete(msg);
295     LOG.info(msg);
296     return totalSize;
297   }
298 
299   /**
300    * Add a task entry to coordination if it is not already there.
301    * @param taskname the path of the log to be split
302    * @param batch the batch this task belongs to
303    * @return true if a new entry is created, false if it is already there.
304    */
305   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
306     lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
307     String task =
308         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
309             .getSplitLogManagerCoordination().prepareTask(taskname);
310     Task oldtask = createTaskIfAbsent(task, batch);
311     if (oldtask == null) {
312       // publish the task in the coordination engine
313       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
314           .getSplitLogManagerCoordination().submitTask(task);
315       return true;
316     }
317     return false;
318   }
319 
320   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
321     synchronized (batch) {
322       while ((batch.done + batch.error) != batch.installed) {
323         try {
324           status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
325               + batch.installed + " done=" + batch.done + " error=" + batch.error);
326           int remaining = batch.installed - (batch.done + batch.error);
327           int actual = activeTasks(batch);
328           if (remaining != actual) {
329             LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
330           }
331           int remainingTasks =
332               ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
333                   .getSplitLogManagerCoordination().remainingTasksInCoordination();
334           if (remainingTasks >= 0 && actual > remainingTasks) {
335             LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
336                 + remainingTasks);
337           }
338           if (remainingTasks == 0 || actual == 0) {
339             LOG.warn("No more task remaining, splitting "
340                 + "should have completed. Remaining tasks is " + remainingTasks
341                 + ", active tasks in map " + actual);
342             if (remainingTasks == 0 && actual == 0) {
343               return;
344             }
345           }
346           batch.wait(100);
347           if (stopper.isStopped()) {
348             LOG.warn("Stopped while waiting for log splits to be completed");
349             return;
350           }
351         } catch (InterruptedException e) {
352           LOG.warn("Interrupted while waiting for log splits to be completed");
353           Thread.currentThread().interrupt();
354           return;
355         }
356       }
357     }
358   }
359 
360   @VisibleForTesting
361   ConcurrentMap<String, Task> getTasks() {
362     return tasks;
363   }
364 
365   private int activeTasks(final TaskBatch batch) {
366     int count = 0;
367     for (Task t : tasks.values()) {
368       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
369         count++;
370       }
371     }
372     return count;
373 
374   }
375 
376   /**
377    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
378    * region server hosting the region can allow reads to the recovered region
379    * @param serverNames servers which are just recovered
380    * @param isMetaRecovery whether current recovery is for the meta region on
381    *          <code>serverNames<code>
382    */
383   private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
384     if (!isLogReplaying()) {
385       // the function is only used in WALEdit direct replay mode
386       return;
387     }
388 
389     Set<String> recoveredServerNameSet = new HashSet<String>();
390     if (serverNames != null) {
391       for (ServerName tmpServerName : serverNames) {
392         recoveredServerNameSet.add(tmpServerName.getServerName());
393       }
394     }
395 
396     try {
397       this.recoveringRegionLock.lock();
398       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
399           .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
400             isMetaRecovery);
401     } catch (IOException e) {
402       LOG.warn("removeRecoveringRegions got exception. Will retry", e);
403       if (serverNames != null && !serverNames.isEmpty()) {
404         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
405             isMetaRecovery));
406       }
407     } finally {
408       this.recoveringRegionLock.unlock();
409     }
410   }
411 
412   /**
413    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
414    * during master initialization phase.
415    * @param failedServers A set of known failed servers
416    * @throws IOException
417    */
418   void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
419       InterruptedIOException {
420     Set<String> knownFailedServers = new HashSet<String>();
421     if (failedServers != null) {
422       for (ServerName tmpServerName : failedServers) {
423         knownFailedServers.add(tmpServerName.getServerName());
424       }
425     }
426 
427     this.recoveringRegionLock.lock();
428     try {
429       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
430           .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
431     } finally {
432       this.recoveringRegionLock.unlock();
433     }
434   }
435 
436   /**
437    * @param path
438    * @param batch
439    * @return null on success, existing task on error
440    */
441   private Task createTaskIfAbsent(String path, TaskBatch batch) {
442     Task oldtask;
443     // batch.installed is only changed via this function and
444     // a single thread touches batch.installed.
445     Task newtask = new Task();
446     newtask.batch = batch;
447     oldtask = tasks.putIfAbsent(path, newtask);
448     if (oldtask == null) {
449       batch.installed++;
450       return null;
451     }
452     // new task was not used.
453     synchronized (oldtask) {
454       if (oldtask.isOrphan()) {
455         if (oldtask.status == SUCCESS) {
456           // The task is already done. Do not install the batch for this
457           // task because it might be too late for setDone() to update
458           // batch.done. There is no need for the batch creator to wait for
459           // this task to complete.
460           return (null);
461         }
462         if (oldtask.status == IN_PROGRESS) {
463           oldtask.batch = batch;
464           batch.installed++;
465           LOG.debug("Previously orphan task " + path + " is now being waited upon");
466           return null;
467         }
468         while (oldtask.status == FAILURE) {
469           LOG.debug("wait for status of task " + path + " to change to DELETED");
470           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
471           try {
472             oldtask.wait();
473           } catch (InterruptedException e) {
474             Thread.currentThread().interrupt();
475             LOG.warn("Interrupted when waiting for znode delete callback");
476             // fall through to return failure
477             break;
478           }
479         }
480         if (oldtask.status != DELETED) {
481           LOG.warn("Failure because previously failed task"
482               + " state still present. Waiting for znode delete callback" + " path=" + path);
483           return oldtask;
484         }
485         // reinsert the newTask and it must succeed this time
486         Task t = tasks.putIfAbsent(path, newtask);
487         if (t == null) {
488           batch.installed++;
489           return null;
490         }
491         LOG.fatal("Logic error. Deleted task still present in tasks map");
492         assert false : "Deleted task still present in tasks map";
493         return t;
494       }
495       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
496       return oldtask;
497     }
498   }
499 
500   Task findOrCreateOrphanTask(String path) {
501     Task orphanTask = new Task();
502     Task task;
503     task = tasks.putIfAbsent(path, orphanTask);
504     if (task == null) {
505       LOG.info("creating orphan task " + path);
506       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
507       task = orphanTask;
508     }
509     return task;
510   }
511 
512   public void stop() {
513     if (timeoutMonitor != null) {
514       timeoutMonitor.interrupt();
515     }
516   }
517 
518   void handleDeadWorker(ServerName workerName) {
519     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
520     // to reason about concurrency. Makes it easier to retry.
521     synchronized (deadWorkersLock) {
522       if (deadWorkers == null) {
523         deadWorkers = new HashSet<ServerName>(100);
524       }
525       deadWorkers.add(workerName);
526     }
527     LOG.info("dead splitlog worker " + workerName);
528   }
529 
530   void handleDeadWorkers(Set<ServerName> serverNames) {
531     synchronized (deadWorkersLock) {
532       if (deadWorkers == null) {
533         deadWorkers = new HashSet<ServerName>(100);
534       }
535       deadWorkers.addAll(serverNames);
536     }
537     LOG.info("dead splitlog workers " + serverNames);
538   }
539 
540   /**
541    * This function is to set recovery mode from outstanding split log tasks from before or current
542    * configuration setting
543    * @param isForInitialization
544    * @throws IOException throws if it's impossible to set recovery mode
545    */
546   public void setRecoveryMode(boolean isForInitialization) throws IOException {
547     ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
548         .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
549 
550   }
551 
552   public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
553       throws InterruptedIOException, IOException {
554     if (userRegions == null || (!isLogReplaying())) {
555       return;
556     }
557     try {
558       this.recoveringRegionLock.lock();
559       // mark that we're creating recovering regions
560       ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
561           .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
562     } finally {
563       this.recoveringRegionLock.unlock();
564     }
565 
566   }
567 
568   /**
569    * @return whether log is replaying
570    */
571   public boolean isLogReplaying() {
572     if (server.getCoordinatedStateManager() == null) return false;
573     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
574         .getSplitLogManagerCoordination().isReplaying();
575   }
576 
577   /**
578    * @return whether log is splitting
579    */
580   public boolean isLogSplitting() {
581     if (server.getCoordinatedStateManager() == null) return false;
582     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
583         .getSplitLogManagerCoordination().isSplitting();
584   }
585 
586   /**
587    * @return the current log recovery mode
588    */
589   public RecoveryMode getRecoveryMode() {
590     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
591         .getSplitLogManagerCoordination().getRecoveryMode();
592   }
593 
594   /**
595    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
596    * Clients threads use this object to wait for all their tasks to be done.
597    * <p>
598    * All access is synchronized.
599    */
600   @InterfaceAudience.Private
601   public static class TaskBatch {
602     public int installed = 0;
603     public int done = 0;
604     public int error = 0;
605     public volatile boolean isDead = false;
606 
607     @Override
608     public String toString() {
609       return ("installed = " + installed + " done = " + done + " error = " + error);
610     }
611   }
612 
613   /**
614    * in memory state of an active task.
615    */
616   @InterfaceAudience.Private
617   public static class Task {
618     public volatile long last_update;
619     public volatile int last_version;
620     public volatile ServerName cur_worker_name;
621     public volatile TaskBatch batch;
622     public volatile TerminationStatus status;
623     public volatile int incarnation;
624     public final AtomicInteger unforcedResubmits = new AtomicInteger();
625     public volatile boolean resubmitThresholdReached;
626 
627     @Override
628     public String toString() {
629       return ("last_update = " + last_update + " last_version = " + last_version
630           + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
631           + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
632     }
633 
634     public Task() {
635       incarnation = 0;
636       last_version = -1;
637       status = IN_PROGRESS;
638       setUnassigned();
639     }
640 
641     public boolean isOrphan() {
642       return (batch == null || batch.isDead);
643     }
644 
645     public boolean isUnassigned() {
646       return (cur_worker_name == null);
647     }
648 
649     public void heartbeatNoDetails(long time) {
650       last_update = time;
651     }
652 
653     public void heartbeat(long time, int version, ServerName worker) {
654       last_version = version;
655       last_update = time;
656       cur_worker_name = worker;
657     }
658 
659     public void setUnassigned() {
660       cur_worker_name = null;
661       last_update = -1;
662     }
663   }
664 
665   /**
666    * Periodically checks all active tasks and resubmits the ones that have timed out
667    */
668   private class TimeoutMonitor extends Chore {
669     private long lastLog = 0;
670 
671     public TimeoutMonitor(final int period, Stoppable stopper) {
672       super("SplitLogManager Timeout Monitor", period, stopper);
673     }
674 
675     @Override
676     protected void chore() {
677       int resubmitted = 0;
678       int unassigned = 0;
679       int tot = 0;
680       boolean found_assigned_task = false;
681       Set<ServerName> localDeadWorkers;
682 
683       synchronized (deadWorkersLock) {
684         localDeadWorkers = deadWorkers;
685         deadWorkers = null;
686       }
687 
688       for (Map.Entry<String, Task> e : tasks.entrySet()) {
689         String path = e.getKey();
690         Task task = e.getValue();
691         ServerName cur_worker = task.cur_worker_name;
692         tot++;
693         // don't easily resubmit a task which hasn't been picked up yet. It
694         // might be a long while before a SplitLogWorker is free to pick up a
695         // task. This is because a SplitLogWorker picks up a task one at a
696         // time. If we want progress when there are no region servers then we
697         // will have to run a SplitLogWorker thread in the Master.
698         if (task.isUnassigned()) {
699           unassigned++;
700           continue;
701         }
702         found_assigned_task = true;
703         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
704           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
705           if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
706               .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
707             resubmitted++;
708           } else {
709             handleDeadWorker(cur_worker);
710             LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
711                 + ", will retry.");
712           }
713         } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
714             .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
715           resubmitted++;
716         }
717       }
718       if (tot > 0) {
719         long now = EnvironmentEdgeManager.currentTime();
720         if (now > lastLog + 5000) {
721           lastLog = now;
722           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
723         }
724       }
725       if (resubmitted > 0) {
726         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
727       }
728       // If there are pending tasks and all of them have been unassigned for
729       // some time then put up a RESCAN node to ping the workers.
730       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
731       // because a. it is very unlikely that every worker had a
732       // transient error when trying to grab the task b. if there are no
733       // workers then all tasks wills stay unassigned indefinitely and the
734       // manager will be indefinitely creating RESCAN nodes. TODO may be the
735       // master should spawn both a manager and a worker thread to guarantee
736       // that there is always one worker in the system
737       if (tot > 0
738           && !found_assigned_task
739           && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
740         for (Map.Entry<String, Task> e : tasks.entrySet()) {
741           String key = e.getKey();
742           Task task = e.getValue();
743           // we have to do task.isUnassigned() check again because tasks might
744           // have been asynchronously assigned. There is no locking required
745           // for these checks ... it is OK even if tryGetDataSetWatch() is
746           // called unnecessarily for a taskpath
747           if (task.isUnassigned() && (task.status != FAILURE)) {
748             // We just touch the znode to make sure its still there
749             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
750                 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
751           }
752         }
753         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
754             .getSplitLogManagerCoordination().checkTasks();
755         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
756         LOG.debug("resubmitting unassigned task(s) after timeout");
757       }
758       Set<String> failedDeletions =
759           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
760               .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
761       // Retry previously failed deletes
762       if (failedDeletions.size() > 0) {
763         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
764         for (String tmpPath : tmpPaths) {
765           // deleteNode is an async call
766           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
767               .getSplitLogManagerCoordination().deleteTask(tmpPath);
768         }
769         failedDeletions.removeAll(tmpPaths);
770       }
771 
772       // Garbage collect left-over
773       long timeInterval =
774           EnvironmentEdgeManager.currentTime()
775               - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
776                   .getSplitLogManagerCoordination().getLastRecoveryTime();
777       if (!failedRecoveringRegionDeletions.isEmpty()
778           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
779         // inside the function there have more checks before GC anything
780         if (!failedRecoveringRegionDeletions.isEmpty()) {
781           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
782               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
783           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
784           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
785             removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
786           }
787         } else {
788           removeRecoveringRegions(null, null);
789         }
790       }
791     }
792   }
793 
794   public enum ResubmitDirective {
795     CHECK(), FORCE();
796   }
797 
798   public enum TerminationStatus {
799     IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
800 
801     String statusMsg;
802 
803     TerminationStatus(String msg) {
804       statusMsg = msg;
805     }
806 
807     @Override
808     public String toString() {
809       return statusMsg;
810     }
811   }
812 }