View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.CoordinatedStateManager;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.SplitLogCounters;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
56  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
57  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
58  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
62  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
63  import org.apache.hadoop.hbase.util.FSUtils;
64  import org.apache.hadoop.hbase.util.Pair;
65  import org.apache.hadoop.hbase.util.Threads;
66  
67  import com.google.common.annotations.VisibleForTesting;
68  
69  /**
70   * Distributes the task of log splitting to the available region servers.
71   * Coordination happens via coordination engine. For every log file that has to be split a
72   * task is created. SplitLogWorkers race to grab a task.
73   *
74   * <p>SplitLogManager monitors the tasks that it creates using the
75   * timeoutMonitor thread. If a task's progress is slow then
76   * {@link SplitLogManagerCoordination#checkTasks} will take away the
77   * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} 
78   * and the task will be up for grabs again. When the task is done then it is 
79   * deleted by SplitLogManager.
80   *
81   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
82   * log files. The caller thread waits in this method until all the log files
83   * have been split.
84   *
85   * <p>All the coordination calls made by this class are asynchronous. This is mainly
86   * to help reduce response time seen by the callers.
87   *
88   * <p>There is race in this design between the SplitLogManager and the
89   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
90   * already been completed by a SplitLogWorker. We rely on the idempotency of
91   * the log splitting task for correctness.
92   *
93   * <p>It is also assumed that every log splitting task is unique and once
94   * completed (either with success or with error) it will be not be submitted
95   * again. If a task is resubmitted then there is a risk that old "delete task"
96   * can delete the re-submission.
97   */
98  @InterfaceAudience.Private
99  public class SplitLogManager {
100   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
101 
102   private Server server;
103 
104   private final Stoppable stopper;
105   private final Configuration conf;
106 
107   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
108 
109   private long unassignedTimeout;
110   private long lastTaskCreateTime = Long.MAX_VALUE;
111   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
112   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
113       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
114 
115   /**
116    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
117    * operation. So the lock is used to guard such cases.
118    */
119   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
120 
121   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
122   private TimeoutMonitor timeoutMonitor;
123 
124   private volatile Set<ServerName> deadWorkers = null;
125   private final Object deadWorkersLock = new Object();
126 
127   /**
128    * Its OK to construct this object even when region-servers are not online. It does lookup the
129    * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
130    * @param server the server instance
131    * @param conf the HBase configuration
132    * @param stopper the stoppable in case anything is wrong
133    * @param master the master services
134    * @param serverName the master server name
135    * @throws IOException
136    */
137   public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
138       MasterServices master, ServerName serverName) throws IOException {
139     this.server = server;
140     this.conf = conf;
141     this.stopper = stopper;
142     if (server.getCoordinatedStateManager() != null) {
143       SplitLogManagerCoordination coordination =
144           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
145               .getSplitLogManagerCoordination();
146       Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
147       SplitLogManagerDetails details =
148           new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
149       coordination.init();
150       coordination.setDetails(details);
151       // Determine recovery mode
152     }
153     this.unassignedTimeout =
154         conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
155     this.timeoutMonitor =
156         new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
157             stopper);
158     Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
159         + ".splitLogManagerTimeoutMonitor");
160   }
161 
162   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
163     return getFileList(conf, logDirs, filter);
164   }
165 
166   /**
167    * Get a list of paths that need to be split given a set of server-specific directories and
168    * optinally  a filter.
169    *
170    * See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
171    * layout.
172    *
173    * Should be package-private, but is needed by
174    * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
175    *     Configuration, WALFactory)} for tests.
176    */
177   @VisibleForTesting
178   public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
179       final PathFilter filter)
180       throws IOException {
181     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
182     for (Path logDir : logDirs) {
183       final FileSystem fs = logDir.getFileSystem(conf);
184       if (!fs.exists(logDir)) {
185         LOG.warn(logDir + " doesn't exist. Nothing to do!");
186         continue;
187       }
188       FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
189       if (logfiles == null || logfiles.length == 0) {
190         LOG.info(logDir + " is empty dir, no logs to split");
191       } else {
192         Collections.addAll(fileStatus, logfiles);
193       }
194     }
195     FileStatus[] a = new FileStatus[fileStatus.size()];
196     return fileStatus.toArray(a);
197   }
198 
199   /**
200    * @param logDir one region sever wal dir path in .logs
201    * @throws IOException if there was an error while splitting any log file
202    * @return cumulative size of the logfiles split
203    * @throws IOException
204    */
205   public long splitLogDistributed(final Path logDir) throws IOException {
206     List<Path> logDirs = new ArrayList<Path>();
207     logDirs.add(logDir);
208     return splitLogDistributed(logDirs);
209   }
210 
211   /**
212    * The caller will block until all the log files of the given region server have been processed -
213    * successfully split or an error is encountered - by an available worker region server. This
214    * method must only be called after the region servers have been brought online.
215    * @param logDirs List of log dirs to split
216    * @throws IOException If there was an error while splitting any log file
217    * @return cumulative size of the logfiles split
218    */
219   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
220     if (logDirs.isEmpty()) {
221       return 0;
222     }
223     Set<ServerName> serverNames = new HashSet<ServerName>();
224     for (Path logDir : logDirs) {
225       try {
226         ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
227         if (serverName != null) {
228           serverNames.add(serverName);
229         }
230       } catch (IllegalArgumentException e) {
231         // ignore invalid format error.
232         LOG.warn("Cannot parse server name from " + logDir);
233       }
234     }
235     return splitLogDistributed(serverNames, logDirs, null);
236   }
237 
238   /**
239    * The caller will block until all the hbase:meta log files of the given region server have been
240    * processed - successfully split or an error is encountered - by an available worker region
241    * server. This method must only be called after the region servers have been brought online.
242    * @param logDirs List of log dirs to split
243    * @param filter the Path filter to select specific files for considering
244    * @throws IOException If there was an error while splitting any log file
245    * @return cumulative size of the logfiles split
246    */
247   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
248       PathFilter filter) throws IOException {
249     MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
250       logDirs + " for serverName=" + serverNames);
251     FileStatus[] logfiles = getFileList(logDirs, filter);
252     status.setStatus("Checking directory contents...");
253     LOG.debug("Scheduling batch of logs to split");
254     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
255     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs +
256       " for " + serverNames);
257     long t = EnvironmentEdgeManager.currentTime();
258     long totalSize = 0;
259     TaskBatch batch = new TaskBatch();
260     Boolean isMetaRecovery = (filter == null) ? null : false;
261     for (FileStatus lf : logfiles) {
262       // TODO If the log file is still being written to - which is most likely
263       // the case for the last log file - then its length will show up here
264       // as zero. The size of such a file can only be retrieved after
265       // recover-lease is done. totalSize will be under in most cases and the
266       // metrics that it drives will also be under-reported.
267       totalSize += lf.getLen();
268       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
269       if (!enqueueSplitTask(pathToLog, batch)) {
270         throw new IOException("duplicate log split scheduled for " + lf.getPath());
271       }
272     }
273     waitForSplittingCompletion(batch, status);
274     // remove recovering regions
275     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
276       // we split meta regions and user regions separately therefore logfiles are either all for
277       // meta or user regions but won't for both( we could have mixed situations in tests)
278       isMetaRecovery = true;
279     }
280     removeRecoveringRegions(serverNames, isMetaRecovery);
281 
282     if (batch.done != batch.installed) {
283       batch.isDead = true;
284       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
285       LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
286           + " but only " + batch.done + " done");
287       String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
288       status.abort(msg);
289       throw new IOException(msg);
290     }
291     for (Path logDir : logDirs) {
292       status.setStatus("Cleaning up log directory...");
293       final FileSystem fs = logDir.getFileSystem(conf);
294       try {
295         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
296           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
297         }
298       } catch (IOException ioe) {
299         FileStatus[] files = fs.listStatus(logDir);
300         if (files != null && files.length > 0) {
301           LOG.warn("returning success without actually splitting and "
302               + "deleting all the log files in path " + logDir);
303         } else {
304           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
305         }
306       }
307       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
308     }
309     String msg =
310         "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
311             + " log files in " + logDirs + " in "
312             + (EnvironmentEdgeManager.currentTime() - t) + "ms";
313     status.markComplete(msg);
314     LOG.info(msg);
315     return totalSize;
316   }
317 
318   /**
319    * Add a task entry to coordination if it is not already there.
320    * @param taskname the path of the log to be split
321    * @param batch the batch this task belongs to
322    * @return true if a new entry is created, false if it is already there.
323    */
324   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
325     lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
326     String task =
327         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
328             .getSplitLogManagerCoordination().prepareTask(taskname);
329     Task oldtask = createTaskIfAbsent(task, batch);
330     if (oldtask == null) {
331       // publish the task in the coordination engine
332       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
333           .getSplitLogManagerCoordination().submitTask(task);
334       return true;
335     }
336     return false;
337   }
338 
339   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
340     synchronized (batch) {
341       while ((batch.done + batch.error) != batch.installed) {
342         try {
343           status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
344               + batch.installed + " done=" + batch.done + " error=" + batch.error);
345           int remaining = batch.installed - (batch.done + batch.error);
346           int actual = activeTasks(batch);
347           if (remaining != actual) {
348             LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
349           }
350           int remainingTasks =
351               ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
352                   .getSplitLogManagerCoordination().remainingTasksInCoordination();
353           if (remainingTasks >= 0 && actual > remainingTasks) {
354             LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
355                 + remainingTasks);
356           }
357           if (remainingTasks == 0 || actual == 0) {
358             LOG.warn("No more task remaining, splitting "
359                 + "should have completed. Remaining tasks is " + remainingTasks
360                 + ", active tasks in map " + actual);
361             if (remainingTasks == 0 && actual == 0) {
362               return;
363             }
364           }
365           batch.wait(100);
366           if (stopper.isStopped()) {
367             LOG.warn("Stopped while waiting for log splits to be completed");
368             return;
369           }
370         } catch (InterruptedException e) {
371           LOG.warn("Interrupted while waiting for log splits to be completed");
372           Thread.currentThread().interrupt();
373           return;
374         }
375       }
376     }
377   }
378 
379   @VisibleForTesting
380   ConcurrentMap<String, Task> getTasks() {
381     return tasks;
382   }
383 
384   private int activeTasks(final TaskBatch batch) {
385     int count = 0;
386     for (Task t : tasks.values()) {
387       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
388         count++;
389       }
390     }
391     return count;
392 
393   }
394 
395   /**
396    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
397    * region server hosting the region can allow reads to the recovered region
398    * @param serverNames servers which are just recovered
399    * @param isMetaRecovery whether current recovery is for the meta region on
400    *          <code>serverNames<code>
401    */
402   private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
403     if (!isLogReplaying()) {
404       // the function is only used in WALEdit direct replay mode
405       return;
406     }
407 
408     Set<String> recoveredServerNameSet = new HashSet<String>();
409     if (serverNames != null) {
410       for (ServerName tmpServerName : serverNames) {
411         recoveredServerNameSet.add(tmpServerName.getServerName());
412       }
413     }
414 
415     try {
416       this.recoveringRegionLock.lock();
417       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
418           .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
419             isMetaRecovery);
420     } catch (IOException e) {
421       LOG.warn("removeRecoveringRegions got exception. Will retry", e);
422       if (serverNames != null && !serverNames.isEmpty()) {
423         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
424             isMetaRecovery));
425       }
426     } finally {
427       this.recoveringRegionLock.unlock();
428     }
429   }
430 
431   /**
432    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
433    * during master initialization phase.
434    * @param failedServers A set of known failed servers
435    * @throws IOException
436    */
437   void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
438       InterruptedIOException {
439     Set<String> knownFailedServers = new HashSet<String>();
440     if (failedServers != null) {
441       for (ServerName tmpServerName : failedServers) {
442         knownFailedServers.add(tmpServerName.getServerName());
443       }
444     }
445 
446     this.recoveringRegionLock.lock();
447     try {
448       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
449           .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
450     } finally {
451       this.recoveringRegionLock.unlock();
452     }
453   }
454 
455   /**
456    * @param path
457    * @param batch
458    * @return null on success, existing task on error
459    */
460   private Task createTaskIfAbsent(String path, TaskBatch batch) {
461     Task oldtask;
462     // batch.installed is only changed via this function and
463     // a single thread touches batch.installed.
464     Task newtask = new Task();
465     newtask.batch = batch;
466     oldtask = tasks.putIfAbsent(path, newtask);
467     if (oldtask == null) {
468       batch.installed++;
469       return null;
470     }
471     // new task was not used.
472     synchronized (oldtask) {
473       if (oldtask.isOrphan()) {
474         if (oldtask.status == SUCCESS) {
475           // The task is already done. Do not install the batch for this
476           // task because it might be too late for setDone() to update
477           // batch.done. There is no need for the batch creator to wait for
478           // this task to complete.
479           return (null);
480         }
481         if (oldtask.status == IN_PROGRESS) {
482           oldtask.batch = batch;
483           batch.installed++;
484           LOG.debug("Previously orphan task " + path + " is now being waited upon");
485           return null;
486         }
487         while (oldtask.status == FAILURE) {
488           LOG.debug("wait for status of task " + path + " to change to DELETED");
489           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
490           try {
491             oldtask.wait();
492           } catch (InterruptedException e) {
493             Thread.currentThread().interrupt();
494             LOG.warn("Interrupted when waiting for znode delete callback");
495             // fall through to return failure
496             break;
497           }
498         }
499         if (oldtask.status != DELETED) {
500           LOG.warn("Failure because previously failed task"
501               + " state still present. Waiting for znode delete callback" + " path=" + path);
502           return oldtask;
503         }
504         // reinsert the newTask and it must succeed this time
505         Task t = tasks.putIfAbsent(path, newtask);
506         if (t == null) {
507           batch.installed++;
508           return null;
509         }
510         LOG.fatal("Logic error. Deleted task still present in tasks map");
511         assert false : "Deleted task still present in tasks map";
512         return t;
513       }
514       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
515       return oldtask;
516     }
517   }
518 
519   Task findOrCreateOrphanTask(String path) {
520     Task orphanTask = new Task();
521     Task task;
522     task = tasks.putIfAbsent(path, orphanTask);
523     if (task == null) {
524       LOG.info("creating orphan task " + path);
525       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
526       task = orphanTask;
527     }
528     return task;
529   }
530 
531   public void stop() {
532     if (timeoutMonitor != null) {
533       timeoutMonitor.interrupt();
534     }
535   }
536 
537   void handleDeadWorker(ServerName workerName) {
538     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
539     // to reason about concurrency. Makes it easier to retry.
540     synchronized (deadWorkersLock) {
541       if (deadWorkers == null) {
542         deadWorkers = new HashSet<ServerName>(100);
543       }
544       deadWorkers.add(workerName);
545     }
546     LOG.info("dead splitlog worker " + workerName);
547   }
548 
549   void handleDeadWorkers(Set<ServerName> serverNames) {
550     synchronized (deadWorkersLock) {
551       if (deadWorkers == null) {
552         deadWorkers = new HashSet<ServerName>(100);
553       }
554       deadWorkers.addAll(serverNames);
555     }
556     LOG.info("dead splitlog workers " + serverNames);
557   }
558 
559   /**
560    * This function is to set recovery mode from outstanding split log tasks from before or current
561    * configuration setting
562    * @param isForInitialization
563    * @throws IOException throws if it's impossible to set recovery mode
564    */
565   public void setRecoveryMode(boolean isForInitialization) throws IOException {
566     ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
567         .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
568 
569   }
570 
571   public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
572       throws InterruptedIOException, IOException {
573     if (userRegions == null || (!isLogReplaying())) {
574       return;
575     }
576     try {
577       this.recoveringRegionLock.lock();
578       // mark that we're creating recovering regions
579       ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
580           .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
581     } finally {
582       this.recoveringRegionLock.unlock();
583     }
584 
585   }
586 
587   /**
588    * @return whether log is replaying
589    */
590   public boolean isLogReplaying() {
591     CoordinatedStateManager m = server.getCoordinatedStateManager();
592     if (m == null) return false;
593     return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying();
594   }
595 
596   /**
597    * @return whether log is splitting
598    */
599   public boolean isLogSplitting() {
600     if (server.getCoordinatedStateManager() == null) return false;
601     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
602         .getSplitLogManagerCoordination().isSplitting();
603   }
604 
605   /**
606    * @return the current log recovery mode
607    */
608   public RecoveryMode getRecoveryMode() {
609     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
610         .getSplitLogManagerCoordination().getRecoveryMode();
611   }
612 
613   /**
614    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
615    * Clients threads use this object to wait for all their tasks to be done.
616    * <p>
617    * All access is synchronized.
618    */
619   @InterfaceAudience.Private
620   public static class TaskBatch {
621     public int installed = 0;
622     public int done = 0;
623     public int error = 0;
624     public volatile boolean isDead = false;
625 
626     @Override
627     public String toString() {
628       return ("installed = " + installed + " done = " + done + " error = " + error);
629     }
630   }
631 
632   /**
633    * in memory state of an active task.
634    */
635   @InterfaceAudience.Private
636   public static class Task {
637     public volatile long last_update;
638     public volatile int last_version;
639     public volatile ServerName cur_worker_name;
640     public volatile TaskBatch batch;
641     public volatile TerminationStatus status;
642     public volatile int incarnation;
643     public final AtomicInteger unforcedResubmits = new AtomicInteger();
644     public volatile boolean resubmitThresholdReached;
645 
646     @Override
647     public String toString() {
648       return ("last_update = " + last_update + " last_version = " + last_version
649           + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
650           + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
651     }
652 
653     public Task() {
654       incarnation = 0;
655       last_version = -1;
656       status = IN_PROGRESS;
657       setUnassigned();
658     }
659 
660     public boolean isOrphan() {
661       return (batch == null || batch.isDead);
662     }
663 
664     public boolean isUnassigned() {
665       return (cur_worker_name == null);
666     }
667 
668     public void heartbeatNoDetails(long time) {
669       last_update = time;
670     }
671 
672     public void heartbeat(long time, int version, ServerName worker) {
673       last_version = version;
674       last_update = time;
675       cur_worker_name = worker;
676     }
677 
678     public void setUnassigned() {
679       cur_worker_name = null;
680       last_update = -1;
681     }
682   }
683 
684   /**
685    * Periodically checks all active tasks and resubmits the ones that have timed out
686    */
687   private class TimeoutMonitor extends Chore {
688     private long lastLog = 0;
689 
690     public TimeoutMonitor(final int period, Stoppable stopper) {
691       super("SplitLogManager Timeout Monitor", period, stopper);
692     }
693 
694     @Override
695     protected void chore() {
696       int resubmitted = 0;
697       int unassigned = 0;
698       int tot = 0;
699       boolean found_assigned_task = false;
700       Set<ServerName> localDeadWorkers;
701 
702       synchronized (deadWorkersLock) {
703         localDeadWorkers = deadWorkers;
704         deadWorkers = null;
705       }
706 
707       for (Map.Entry<String, Task> e : tasks.entrySet()) {
708         String path = e.getKey();
709         Task task = e.getValue();
710         ServerName cur_worker = task.cur_worker_name;
711         tot++;
712         // don't easily resubmit a task which hasn't been picked up yet. It
713         // might be a long while before a SplitLogWorker is free to pick up a
714         // task. This is because a SplitLogWorker picks up a task one at a
715         // time. If we want progress when there are no region servers then we
716         // will have to run a SplitLogWorker thread in the Master.
717         if (task.isUnassigned()) {
718           unassigned++;
719           continue;
720         }
721         found_assigned_task = true;
722         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
723           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
724           if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
725               .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
726             resubmitted++;
727           } else {
728             handleDeadWorker(cur_worker);
729             LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
730                 + ", will retry.");
731           }
732         } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
733             .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
734           resubmitted++;
735         }
736       }
737       if (tot > 0) {
738         long now = EnvironmentEdgeManager.currentTime();
739         if (now > lastLog + 5000) {
740           lastLog = now;
741           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
742         }
743       }
744       if (resubmitted > 0) {
745         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
746       }
747       // If there are pending tasks and all of them have been unassigned for
748       // some time then put up a RESCAN node to ping the workers.
749       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
750       // because a. it is very unlikely that every worker had a
751       // transient error when trying to grab the task b. if there are no
752       // workers then all tasks wills stay unassigned indefinitely and the
753       // manager will be indefinitely creating RESCAN nodes. TODO may be the
754       // master should spawn both a manager and a worker thread to guarantee
755       // that there is always one worker in the system
756       if (tot > 0
757           && !found_assigned_task
758           && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
759         for (Map.Entry<String, Task> e : tasks.entrySet()) {
760           String key = e.getKey();
761           Task task = e.getValue();
762           // we have to do task.isUnassigned() check again because tasks might
763           // have been asynchronously assigned. There is no locking required
764           // for these checks ... it is OK even if tryGetDataSetWatch() is
765           // called unnecessarily for a taskpath
766           if (task.isUnassigned() && (task.status != FAILURE)) {
767             // We just touch the znode to make sure its still there
768             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
769                 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
770           }
771         }
772         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
773             .getSplitLogManagerCoordination().checkTasks();
774         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
775         LOG.debug("resubmitting unassigned task(s) after timeout");
776       }
777       Set<String> failedDeletions =
778           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
779               .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
780       // Retry previously failed deletes
781       if (failedDeletions.size() > 0) {
782         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
783         for (String tmpPath : tmpPaths) {
784           // deleteNode is an async call
785           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
786               .getSplitLogManagerCoordination().deleteTask(tmpPath);
787         }
788         failedDeletions.removeAll(tmpPaths);
789       }
790 
791       // Garbage collect left-over
792       long timeInterval =
793           EnvironmentEdgeManager.currentTime()
794               - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
795                   .getSplitLogManagerCoordination().getLastRecoveryTime();
796       if (!failedRecoveringRegionDeletions.isEmpty()
797           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
798         // inside the function there have more checks before GC anything
799         if (!failedRecoveringRegionDeletions.isEmpty()) {
800           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
801               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
802           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
803           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
804             removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
805           }
806         } else {
807           removeRecoveringRegions(null, null);
808         }
809       }
810     }
811   }
812 
813   public enum ResubmitDirective {
814     CHECK(), FORCE();
815   }
816 
817   public enum TerminationStatus {
818     IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
819 
820     String statusMsg;
821 
822     TerminationStatus(String msg) {
823       statusMsg = msg;
824     }
825 
826     @Override
827     public String toString() {
828       return statusMsg;
829     }
830   }
831 }