View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.fs.PathFilter;
47  import org.apache.hadoop.hbase.ChoreService;
48  import org.apache.hadoop.hbase.CoordinatedStateManager;
49  import org.apache.hadoop.hbase.HRegionInfo;
50  import org.apache.hadoop.hbase.ScheduledChore;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.SplitLogCounters;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.classification.InterfaceAudience;
56  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
57  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
58  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
59  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
60  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
61  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63  import org.apache.hadoop.hbase.util.FSUtils;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
66  import org.apache.hadoop.hbase.wal.WALFactory;
67  
68  import com.google.common.annotations.VisibleForTesting;
69  
70  /**
71   * Distributes the task of log splitting to the available region servers.
72   * Coordination happens via coordination engine. For every log file that has to be split a
73   * task is created. SplitLogWorkers race to grab a task.
74   *
75   * <p>SplitLogManager monitors the tasks that it creates using the
76   * timeoutMonitor thread. If a task's progress is slow then
77   * {@link SplitLogManagerCoordination#checkTasks} will take away the
78   * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} 
79   * and the task will be up for grabs again. When the task is done then it is 
80   * deleted by SplitLogManager.
81   *
82   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
83   * log files. The caller thread waits in this method until all the log files
84   * have been split.
85   *
86   * <p>All the coordination calls made by this class are asynchronous. This is mainly
87   * to help reduce response time seen by the callers.
88   *
89   * <p>There is race in this design between the SplitLogManager and the
90   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
91   * already been completed by a SplitLogWorker. We rely on the idempotency of
92   * the log splitting task for correctness.
93   *
94   * <p>It is also assumed that every log splitting task is unique and once
95   * completed (either with success or with error) it will be not be submitted
96   * again. If a task is resubmitted then there is a risk that old "delete task"
97   * can delete the re-submission.
98   */
99  @InterfaceAudience.Private
100 public class SplitLogManager {
101   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
102 
103   private Server server;
104 
105   private final Stoppable stopper;
106   private final Configuration conf;
107   private final ChoreService choreService;
108 
109   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
110 
111   private long unassignedTimeout;
112   private long lastTaskCreateTime = Long.MAX_VALUE;
113   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
114   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
115       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
116 
117   /**
118    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
119    * operation. So the lock is used to guard such cases.
120    */
121   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
122 
123   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
124   private TimeoutMonitor timeoutMonitor;
125 
126   private volatile Set<ServerName> deadWorkers = null;
127   private final Object deadWorkersLock = new Object();
128 
129   /**
130    * Its OK to construct this object even when region-servers are not online. It does lookup the
131    * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
132    * @param server the server instance
133    * @param conf the HBase configuration
134    * @param stopper the stoppable in case anything is wrong
135    * @param master the master services
136    * @param serverName the master server name
137    * @throws IOException
138    */
139   public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
140       MasterServices master, ServerName serverName) throws IOException {
141     this.server = server;
142     this.conf = conf;
143     this.stopper = stopper;
144     this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_");
145     if (server.getCoordinatedStateManager() != null) {
146       SplitLogManagerCoordination coordination =
147           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
148               .getSplitLogManagerCoordination();
149       Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
150       SplitLogManagerDetails details =
151           new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
152       coordination.setDetails(details);
153       coordination.init();
154       // Determine recovery mode
155     }
156     this.unassignedTimeout =
157         conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
158     this.timeoutMonitor =
159         new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
160             stopper);
161     choreService.scheduleChore(timeoutMonitor);
162   }
163 
164   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
165     return getFileList(conf, logDirs, filter);
166   }
167 
168   /**
169    * Get a list of paths that need to be split given a set of server-specific directories and
170    * optionally  a filter.
171    *
172    * See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
173    * layout.
174    *
175    * Should be package-private, but is needed by
176    * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
177    *     Configuration, WALFactory)} for tests.
178    */
179   @VisibleForTesting
180   public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
181       final PathFilter filter)
182       throws IOException {
183     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
184     for (Path logDir : logDirs) {
185       final FileSystem fs = logDir.getFileSystem(conf);
186       if (!fs.exists(logDir)) {
187         LOG.warn(logDir + " doesn't exist. Nothing to do!");
188         continue;
189       }
190       FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
191       if (logfiles == null || logfiles.length == 0) {
192         LOG.info(logDir + " is empty dir, no logs to split");
193       } else {
194         Collections.addAll(fileStatus, logfiles);
195       }
196     }
197     FileStatus[] a = new FileStatus[fileStatus.size()];
198     return fileStatus.toArray(a);
199   }
200 
201   /**
202    * @param logDir one region sever wal dir path in .logs
203    * @throws IOException if there was an error while splitting any log file
204    * @return cumulative size of the logfiles split
205    * @throws IOException
206    */
207   public long splitLogDistributed(final Path logDir) throws IOException {
208     List<Path> logDirs = new ArrayList<Path>();
209     logDirs.add(logDir);
210     return splitLogDistributed(logDirs);
211   }
212 
213   /**
214    * The caller will block until all the log files of the given region server have been processed -
215    * successfully split or an error is encountered - by an available worker region server. This
216    * method must only be called after the region servers have been brought online.
217    * @param logDirs List of log dirs to split
218    * @throws IOException If there was an error while splitting any log file
219    * @return cumulative size of the logfiles split
220    */
221   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
222     if (logDirs.isEmpty()) {
223       return 0;
224     }
225     Set<ServerName> serverNames = new HashSet<ServerName>();
226     for (Path logDir : logDirs) {
227       try {
228         ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
229         if (serverName != null) {
230           serverNames.add(serverName);
231         }
232       } catch (IllegalArgumentException e) {
233         // ignore invalid format error.
234         LOG.warn("Cannot parse server name from " + logDir);
235       }
236     }
237     return splitLogDistributed(serverNames, logDirs, null);
238   }
239 
240   /**
241    * The caller will block until all the hbase:meta log files of the given region server have been
242    * processed - successfully split or an error is encountered - by an available worker region
243    * server. This method must only be called after the region servers have been brought online.
244    * @param logDirs List of log dirs to split
245    * @param filter the Path filter to select specific files for considering
246    * @throws IOException If there was an error while splitting any log file
247    * @return cumulative size of the logfiles split
248    */
249   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
250       PathFilter filter) throws IOException {
251     MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
252       logDirs + " for serverName=" + serverNames);
253     FileStatus[] logfiles = getFileList(logDirs, filter);
254     status.setStatus("Checking directory contents...");
255     LOG.debug("Scheduling batch of logs to split");
256     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
257     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs +
258       " for " + serverNames);
259     long t = EnvironmentEdgeManager.currentTime();
260     long totalSize = 0;
261     TaskBatch batch = new TaskBatch();
262     Boolean isMetaRecovery = (filter == null) ? null : false;
263     for (FileStatus lf : logfiles) {
264       // TODO If the log file is still being written to - which is most likely
265       // the case for the last log file - then its length will show up here
266       // as zero. The size of such a file can only be retrieved after
267       // recover-lease is done. totalSize will be under in most cases and the
268       // metrics that it drives will also be under-reported.
269       totalSize += lf.getLen();
270       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
271       if (!enqueueSplitTask(pathToLog, batch)) {
272         throw new IOException("duplicate log split scheduled for " + lf.getPath());
273       }
274     }
275     waitForSplittingCompletion(batch, status);
276     // remove recovering regions
277     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
278       // we split meta regions and user regions separately therefore logfiles are either all for
279       // meta or user regions but won't for both( we could have mixed situations in tests)
280       isMetaRecovery = true;
281     }
282     removeRecoveringRegions(serverNames, isMetaRecovery);
283 
284     if (batch.done != batch.installed) {
285       batch.isDead = true;
286       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
287       LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
288           + " but only " + batch.done + " done");
289       String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
290       status.abort(msg);
291       throw new IOException(msg);
292     }
293     for (Path logDir : logDirs) {
294       status.setStatus("Cleaning up log directory...");
295       final FileSystem fs = logDir.getFileSystem(conf);
296       try {
297         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
298           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
299         }
300       } catch (IOException ioe) {
301         FileStatus[] files = fs.listStatus(logDir);
302         if (files != null && files.length > 0) {
303           LOG.warn("returning success without actually splitting and "
304               + "deleting all the log files in path " + logDir);
305         } else {
306           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
307         }
308       }
309       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
310     }
311     String msg =
312         "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
313             + " log files in " + logDirs + " in "
314             + (EnvironmentEdgeManager.currentTime() - t) + "ms";
315     status.markComplete(msg);
316     LOG.info(msg);
317     return totalSize;
318   }
319 
320   /**
321    * Add a task entry to coordination if it is not already there.
322    * @param taskname the path of the log to be split
323    * @param batch the batch this task belongs to
324    * @return true if a new entry is created, false if it is already there.
325    */
326   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
327     lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
328     String task =
329         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
330             .getSplitLogManagerCoordination().prepareTask(taskname);
331     Task oldtask = createTaskIfAbsent(task, batch);
332     if (oldtask == null) {
333       // publish the task in the coordination engine
334       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
335           .getSplitLogManagerCoordination().submitTask(task);
336       return true;
337     }
338     return false;
339   }
340 
341   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
342     synchronized (batch) {
343       while ((batch.done + batch.error) != batch.installed) {
344         try {
345           status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
346               + batch.installed + " done=" + batch.done + " error=" + batch.error);
347           int remaining = batch.installed - (batch.done + batch.error);
348           int actual = activeTasks(batch);
349           if (remaining != actual) {
350             LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
351           }
352           int remainingTasks =
353               ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
354                   .getSplitLogManagerCoordination().remainingTasksInCoordination();
355           if (remainingTasks >= 0 && actual > remainingTasks) {
356             LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
357                 + remainingTasks);
358           }
359           if (remainingTasks == 0 || actual == 0) {
360             LOG.warn("No more task remaining, splitting "
361                 + "should have completed. Remaining tasks is " + remainingTasks
362                 + ", active tasks in map " + actual);
363             if (remainingTasks == 0 && actual == 0) {
364               return;
365             }
366           }
367           batch.wait(100);
368           if (stopper.isStopped()) {
369             LOG.warn("Stopped while waiting for log splits to be completed");
370             return;
371           }
372         } catch (InterruptedException e) {
373           LOG.warn("Interrupted while waiting for log splits to be completed");
374           Thread.currentThread().interrupt();
375           return;
376         }
377       }
378     }
379   }
380 
381   @VisibleForTesting
382   ConcurrentMap<String, Task> getTasks() {
383     return tasks;
384   }
385 
386   private int activeTasks(final TaskBatch batch) {
387     int count = 0;
388     for (Task t : tasks.values()) {
389       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
390         count++;
391       }
392     }
393     return count;
394 
395   }
396 
397   /**
398    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
399    * region server hosting the region can allow reads to the recovered region
400    * @param serverNames servers which are just recovered
401    * @param isMetaRecovery whether current recovery is for the meta region on
402    *          <code>serverNames<code>
403    */
404   private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
405     if (!isLogReplaying()) {
406       // the function is only used in WALEdit direct replay mode
407       return;
408     }
409 
410     Set<String> recoveredServerNameSet = new HashSet<String>();
411     if (serverNames != null) {
412       for (ServerName tmpServerName : serverNames) {
413         recoveredServerNameSet.add(tmpServerName.getServerName());
414       }
415     }
416 
417     try {
418       this.recoveringRegionLock.lock();
419       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
420           .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
421             isMetaRecovery);
422     } catch (IOException e) {
423       LOG.warn("removeRecoveringRegions got exception. Will retry", e);
424       if (serverNames != null && !serverNames.isEmpty()) {
425         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
426             isMetaRecovery));
427       }
428     } finally {
429       this.recoveringRegionLock.unlock();
430     }
431   }
432 
433   /**
434    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
435    * during master initialization phase.
436    * @param failedServers A set of known failed servers
437    * @throws IOException
438    */
439   void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
440       InterruptedIOException {
441     Set<String> knownFailedServers = new HashSet<String>();
442     if (failedServers != null) {
443       for (ServerName tmpServerName : failedServers) {
444         knownFailedServers.add(tmpServerName.getServerName());
445       }
446     }
447 
448     this.recoveringRegionLock.lock();
449     try {
450       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
451           .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
452     } finally {
453       this.recoveringRegionLock.unlock();
454     }
455   }
456 
457   /**
458    * @param path
459    * @param batch
460    * @return null on success, existing task on error
461    */
462   private Task createTaskIfAbsent(String path, TaskBatch batch) {
463     Task oldtask;
464     // batch.installed is only changed via this function and
465     // a single thread touches batch.installed.
466     Task newtask = new Task();
467     newtask.batch = batch;
468     oldtask = tasks.putIfAbsent(path, newtask);
469     if (oldtask == null) {
470       batch.installed++;
471       return null;
472     }
473     // new task was not used.
474     synchronized (oldtask) {
475       if (oldtask.isOrphan()) {
476         if (oldtask.status == SUCCESS) {
477           // The task is already done. Do not install the batch for this
478           // task because it might be too late for setDone() to update
479           // batch.done. There is no need for the batch creator to wait for
480           // this task to complete.
481           return (null);
482         }
483         if (oldtask.status == IN_PROGRESS) {
484           oldtask.batch = batch;
485           batch.installed++;
486           LOG.debug("Previously orphan task " + path + " is now being waited upon");
487           return null;
488         }
489         while (oldtask.status == FAILURE) {
490           LOG.debug("wait for status of task " + path + " to change to DELETED");
491           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
492           try {
493             oldtask.wait();
494           } catch (InterruptedException e) {
495             Thread.currentThread().interrupt();
496             LOG.warn("Interrupted when waiting for znode delete callback");
497             // fall through to return failure
498             break;
499           }
500         }
501         if (oldtask.status != DELETED) {
502           LOG.warn("Failure because previously failed task"
503               + " state still present. Waiting for znode delete callback" + " path=" + path);
504           return oldtask;
505         }
506         // reinsert the newTask and it must succeed this time
507         Task t = tasks.putIfAbsent(path, newtask);
508         if (t == null) {
509           batch.installed++;
510           return null;
511         }
512         LOG.fatal("Logic error. Deleted task still present in tasks map");
513         assert false : "Deleted task still present in tasks map";
514         return t;
515       }
516       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
517       return oldtask;
518     }
519   }
520 
521   Task findOrCreateOrphanTask(String path) {
522     Task orphanTask = new Task();
523     Task task;
524     task = tasks.putIfAbsent(path, orphanTask);
525     if (task == null) {
526       LOG.info("creating orphan task " + path);
527       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
528       task = orphanTask;
529     }
530     return task;
531   }
532 
533   public void stop() {
534     if (choreService != null) {
535       choreService.shutdown();
536     }
537     if (timeoutMonitor != null) {
538       timeoutMonitor.cancel(true);
539     }
540   }
541 
542   void handleDeadWorker(ServerName workerName) {
543     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
544     // to reason about concurrency. Makes it easier to retry.
545     synchronized (deadWorkersLock) {
546       if (deadWorkers == null) {
547         deadWorkers = new HashSet<ServerName>(100);
548       }
549       deadWorkers.add(workerName);
550     }
551     LOG.info("dead splitlog worker " + workerName);
552   }
553 
554   void handleDeadWorkers(Set<ServerName> serverNames) {
555     synchronized (deadWorkersLock) {
556       if (deadWorkers == null) {
557         deadWorkers = new HashSet<ServerName>(100);
558       }
559       deadWorkers.addAll(serverNames);
560     }
561     LOG.info("dead splitlog workers " + serverNames);
562   }
563 
564   /**
565    * This function is to set recovery mode from outstanding split log tasks from before or current
566    * configuration setting
567    * @param isForInitialization
568    * @throws IOException throws if it's impossible to set recovery mode
569    */
570   public void setRecoveryMode(boolean isForInitialization) throws IOException {
571     ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
572         .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
573 
574   }
575 
576   public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
577       throws InterruptedIOException, IOException {
578     if (userRegions == null || (!isLogReplaying())) {
579       return;
580     }
581     try {
582       this.recoveringRegionLock.lock();
583       // mark that we're creating recovering regions
584       ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
585           .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
586     } finally {
587       this.recoveringRegionLock.unlock();
588     }
589 
590   }
591 
592   /**
593    * @return whether log is replaying
594    */
595   public boolean isLogReplaying() {
596     CoordinatedStateManager m = server.getCoordinatedStateManager();
597     if (m == null) return false;
598     return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying();
599   }
600 
601   /**
602    * @return whether log is splitting
603    */
604   public boolean isLogSplitting() {
605     if (server.getCoordinatedStateManager() == null) return false;
606     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
607         .getSplitLogManagerCoordination().isSplitting();
608   }
609 
610   /**
611    * @return the current log recovery mode
612    */
613   public RecoveryMode getRecoveryMode() {
614     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
615         .getSplitLogManagerCoordination().getRecoveryMode();
616   }
617 
618   /**
619    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
620    * Clients threads use this object to wait for all their tasks to be done.
621    * <p>
622    * All access is synchronized.
623    */
624   @InterfaceAudience.Private
625   public static class TaskBatch {
626     public int installed = 0;
627     public int done = 0;
628     public int error = 0;
629     public volatile boolean isDead = false;
630 
631     @Override
632     public String toString() {
633       return ("installed = " + installed + " done = " + done + " error = " + error);
634     }
635   }
636 
637   /**
638    * in memory state of an active task.
639    */
640   @InterfaceAudience.Private
641   public static class Task {
642     public volatile long last_update;
643     public volatile int last_version;
644     public volatile ServerName cur_worker_name;
645     public volatile TaskBatch batch;
646     public volatile TerminationStatus status;
647     public volatile int incarnation;
648     public final AtomicInteger unforcedResubmits = new AtomicInteger();
649     public volatile boolean resubmitThresholdReached;
650 
651     @Override
652     public String toString() {
653       return ("last_update = " + last_update + " last_version = " + last_version
654           + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
655           + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
656     }
657 
658     public Task() {
659       incarnation = 0;
660       last_version = -1;
661       status = IN_PROGRESS;
662       setUnassigned();
663     }
664 
665     public boolean isOrphan() {
666       return (batch == null || batch.isDead);
667     }
668 
669     public boolean isUnassigned() {
670       return (cur_worker_name == null);
671     }
672 
673     public void heartbeatNoDetails(long time) {
674       last_update = time;
675     }
676 
677     public void heartbeat(long time, int version, ServerName worker) {
678       last_version = version;
679       last_update = time;
680       cur_worker_name = worker;
681     }
682 
683     public void setUnassigned() {
684       cur_worker_name = null;
685       last_update = -1;
686     }
687   }
688 
689   /**
690    * Periodically checks all active tasks and resubmits the ones that have timed out
691    */
692   private class TimeoutMonitor extends ScheduledChore {
693     private long lastLog = 0;
694 
695     public TimeoutMonitor(final int period, Stoppable stopper) {
696       super("SplitLogManager Timeout Monitor", stopper, period);
697     }
698 
699     @Override
700     protected void chore() {
701       int resubmitted = 0;
702       int unassigned = 0;
703       int tot = 0;
704       boolean found_assigned_task = false;
705       Set<ServerName> localDeadWorkers;
706 
707       synchronized (deadWorkersLock) {
708         localDeadWorkers = deadWorkers;
709         deadWorkers = null;
710       }
711 
712       for (Map.Entry<String, Task> e : tasks.entrySet()) {
713         String path = e.getKey();
714         Task task = e.getValue();
715         ServerName cur_worker = task.cur_worker_name;
716         tot++;
717         // don't easily resubmit a task which hasn't been picked up yet. It
718         // might be a long while before a SplitLogWorker is free to pick up a
719         // task. This is because a SplitLogWorker picks up a task one at a
720         // time. If we want progress when there are no region servers then we
721         // will have to run a SplitLogWorker thread in the Master.
722         if (task.isUnassigned()) {
723           unassigned++;
724           continue;
725         }
726         found_assigned_task = true;
727         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
728           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
729           if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
730               .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
731             resubmitted++;
732           } else {
733             handleDeadWorker(cur_worker);
734             LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
735                 + ", will retry.");
736           }
737         } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
738             .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
739           resubmitted++;
740         }
741       }
742       if (tot > 0) {
743         long now = EnvironmentEdgeManager.currentTime();
744         if (now > lastLog + 5000) {
745           lastLog = now;
746           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
747         }
748       }
749       if (resubmitted > 0) {
750         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
751       }
752       // If there are pending tasks and all of them have been unassigned for
753       // some time then put up a RESCAN node to ping the workers.
754       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
755       // because a. it is very unlikely that every worker had a
756       // transient error when trying to grab the task b. if there are no
757       // workers then all tasks wills stay unassigned indefinitely and the
758       // manager will be indefinitely creating RESCAN nodes. TODO may be the
759       // master should spawn both a manager and a worker thread to guarantee
760       // that there is always one worker in the system
761       if (tot > 0
762           && !found_assigned_task
763           && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
764         for (Map.Entry<String, Task> e : tasks.entrySet()) {
765           String key = e.getKey();
766           Task task = e.getValue();
767           // we have to do task.isUnassigned() check again because tasks might
768           // have been asynchronously assigned. There is no locking required
769           // for these checks ... it is OK even if tryGetDataSetWatch() is
770           // called unnecessarily for a taskpath
771           if (task.isUnassigned() && (task.status != FAILURE)) {
772             // We just touch the znode to make sure its still there
773             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
774                 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
775           }
776         }
777         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
778             .getSplitLogManagerCoordination().checkTasks();
779         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
780         LOG.debug("resubmitting unassigned task(s) after timeout");
781       }
782       Set<String> failedDeletions =
783           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
784               .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
785       // Retry previously failed deletes
786       if (failedDeletions.size() > 0) {
787         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
788         for (String tmpPath : tmpPaths) {
789           // deleteNode is an async call
790           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
791               .getSplitLogManagerCoordination().deleteTask(tmpPath);
792         }
793         failedDeletions.removeAll(tmpPaths);
794       }
795 
796       // Garbage collect left-over
797       long timeInterval =
798           EnvironmentEdgeManager.currentTime()
799               - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
800                   .getSplitLogManagerCoordination().getLastRecoveryTime();
801       if (!failedRecoveringRegionDeletions.isEmpty()
802           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
803         // inside the function there have more checks before GC anything
804         if (!failedRecoveringRegionDeletions.isEmpty()) {
805           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
806               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
807           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
808           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
809             removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
810           }
811         } else {
812           removeRecoveringRegions(null, null);
813         }
814       }
815     }
816   }
817 
818   public enum ResubmitDirective {
819     CHECK(), FORCE();
820   }
821 
822   public enum TerminationStatus {
823     IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
824 
825     String statusMsg;
826 
827     TerminationStatus(String msg) {
828       statusMsg = msg;
829     }
830 
831     @Override
832     public String toString() {
833       return statusMsg;
834     }
835   }
836 }