View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.fs.FileStatus;
44  import org.apache.hadoop.fs.FileSystem;
45  import org.apache.hadoop.fs.Path;
46  import org.apache.hadoop.fs.PathFilter;
47  import org.apache.hadoop.hbase.ChoreService;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.ScheduledChore;
50  import org.apache.hadoop.hbase.Server;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.SplitLogCounters;
53  import org.apache.hadoop.hbase.Stoppable;
54  import org.apache.hadoop.hbase.classification.InterfaceAudience;
55  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
56  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
57  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
58  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
62  import org.apache.hadoop.hbase.util.FSUtils;
63  import org.apache.hadoop.hbase.util.Pair;
64  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
65  import org.apache.hadoop.hbase.wal.WALFactory;
66  
67  import com.google.common.annotations.VisibleForTesting;
68  
69  /**
70   * Distributes the task of log splitting to the available region servers.
71   * Coordination happens via coordination engine. For every log file that has to be split a
72   * task is created. SplitLogWorkers race to grab a task.
73   *
74   * <p>SplitLogManager monitors the tasks that it creates using the
75   * timeoutMonitor thread. If a task's progress is slow then
76   * {@link SplitLogManagerCoordination#checkTasks} will take away the
77   * task from the owner {@link org.apache.hadoop.hbase.regionserver.SplitLogWorker} 
78   * and the task will be up for grabs again. When the task is done then it is deleted 
79   * by SplitLogManager.
80   *
81   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
82   * log files. The caller thread waits in this method until all the log files
83   * have been split.
84   *
85   * <p>All the coordination calls made by this class are asynchronous. This is mainly
86   * to help reduce response time seen by the callers.
87   *
88   * <p>There is race in this design between the SplitLogManager and the
89   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
90   * already been completed by a SplitLogWorker. We rely on the idempotency of
91   * the log splitting task for correctness.
92   *
93   * <p>It is also assumed that every log splitting task is unique and once
94   * completed (either with success or with error) it will be not be submitted
95   * again. If a task is resubmitted then there is a risk that old "delete task"
96   * can delete the re-submission.
97   */
98  @InterfaceAudience.Private
99  public class SplitLogManager {
100   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
101 
102   private Server server;
103 
104   private final Stoppable stopper;
105   private final Configuration conf;
106   private final ChoreService choreService;
107 
108   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
109 
110   private long unassignedTimeout;
111   private long lastTaskCreateTime = Long.MAX_VALUE;
112   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
113   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
114       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
115 
116   /**
117    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
118    * operation. So the lock is used to guard such cases.
119    */
120   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
121 
122   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
123   private TimeoutMonitor timeoutMonitor;
124 
125   private volatile Set<ServerName> deadWorkers = null;
126   private final Object deadWorkersLock = new Object();
127 
128   /**
129    * Its OK to construct this object even when region-servers are not online. It does lookup the
130    * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
131    * @param server the server instance
132    * @param conf the HBase configuration
133    * @param stopper the stoppable in case anything is wrong
134    * @param master the master services
135    * @param serverName the master server name
136    * @throws IOException
137    */
138   public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
139       MasterServices master, ServerName serverName) throws IOException {
140     this.server = server;
141     this.conf = conf;
142     this.stopper = stopper;
143     this.choreService = new ChoreService(serverName.toString() + "_splitLogManager_");
144     if (server.getCoordinatedStateManager() != null) {
145       SplitLogManagerCoordination coordination =
146           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
147               .getSplitLogManagerCoordination();
148       Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
149       SplitLogManagerDetails details =
150           new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
151       coordination.setDetails(details);
152       coordination.init();
153       // Determine recovery mode
154     }
155     this.unassignedTimeout =
156         conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
157     this.timeoutMonitor =
158         new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
159             stopper);
160     choreService.scheduleChore(timeoutMonitor);
161   }
162 
163   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
164     return getFileList(conf, logDirs, filter);
165   }
166 
167   /**
168    * Get a list of paths that need to be split given a set of server-specific directories and
169    * optionally  a filter.
170    *
171    * See {@link DefaultWALProvider#getServerNameFromWALDirectoryName} for more info on directory
172    * layout.
173    *
174    * Should be package-private, but is needed by
175    * {@link org.apache.hadoop.hbase.wal.WALSplitter#split(Path, Path, Path, FileSystem,
176    *     Configuration, WALFactory)} for tests.
177    */
178   @VisibleForTesting
179   public static FileStatus[] getFileList(final Configuration conf, final List<Path> logDirs,
180       final PathFilter filter)
181       throws IOException {
182     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
183     for (Path logDir : logDirs) {
184       final FileSystem fs = logDir.getFileSystem(conf);
185       if (!fs.exists(logDir)) {
186         LOG.warn(logDir + " doesn't exist. Nothing to do!");
187         continue;
188       }
189       FileStatus[] logfiles = FSUtils.listStatus(fs, logDir, filter);
190       if (logfiles == null || logfiles.length == 0) {
191         LOG.info(logDir + " is empty dir, no logs to split");
192       } else {
193         Collections.addAll(fileStatus, logfiles);
194       }
195     }
196     FileStatus[] a = new FileStatus[fileStatus.size()];
197     return fileStatus.toArray(a);
198   }
199 
200   /**
201    * @param logDir one region sever wal dir path in .logs
202    * @throws IOException if there was an error while splitting any log file
203    * @return cumulative size of the logfiles split
204    * @throws IOException
205    */
206   public long splitLogDistributed(final Path logDir) throws IOException {
207     List<Path> logDirs = new ArrayList<Path>();
208     logDirs.add(logDir);
209     return splitLogDistributed(logDirs);
210   }
211 
212   /**
213    * The caller will block until all the log files of the given region server have been processed -
214    * successfully split or an error is encountered - by an available worker region server. This
215    * method must only be called after the region servers have been brought online.
216    * @param logDirs List of log dirs to split
217    * @throws IOException If there was an error while splitting any log file
218    * @return cumulative size of the logfiles split
219    */
220   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
221     if (logDirs.isEmpty()) {
222       return 0;
223     }
224     Set<ServerName> serverNames = new HashSet<ServerName>();
225     for (Path logDir : logDirs) {
226       try {
227         ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logDir);
228         if (serverName != null) {
229           serverNames.add(serverName);
230         }
231       } catch (IllegalArgumentException e) {
232         // ignore invalid format error.
233         LOG.warn("Cannot parse server name from " + logDir);
234       }
235     }
236     return splitLogDistributed(serverNames, logDirs, null);
237   }
238 
239   /**
240    * The caller will block until all the hbase:meta log files of the given region server have been
241    * processed - successfully split or an error is encountered - by an available worker region
242    * server. This method must only be called after the region servers have been brought online.
243    * @param logDirs List of log dirs to split
244    * @param filter the Path filter to select specific files for considering
245    * @throws IOException If there was an error while splitting any log file
246    * @return cumulative size of the logfiles split
247    */
248   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
249       PathFilter filter) throws IOException {
250     MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
251       logDirs + " for serverName=" + serverNames);
252     FileStatus[] logfiles = getFileList(logDirs, filter);
253     status.setStatus("Checking directory contents...");
254     LOG.debug("Scheduling batch of logs to split");
255     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
256     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs +
257       " for " + serverNames);
258     long t = EnvironmentEdgeManager.currentTime();
259     long totalSize = 0;
260     TaskBatch batch = new TaskBatch();
261     Boolean isMetaRecovery = (filter == null) ? null : false;
262     for (FileStatus lf : logfiles) {
263       // TODO If the log file is still being written to - which is most likely
264       // the case for the last log file - then its length will show up here
265       // as zero. The size of such a file can only be retrieved after
266       // recover-lease is done. totalSize will be under in most cases and the
267       // metrics that it drives will also be under-reported.
268       totalSize += lf.getLen();
269       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
270       if (!enqueueSplitTask(pathToLog, batch)) {
271         throw new IOException("duplicate log split scheduled for " + lf.getPath());
272       }
273     }
274     waitForSplittingCompletion(batch, status);
275     // remove recovering regions
276     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
277       // we split meta regions and user regions separately therefore logfiles are either all for
278       // meta or user regions but won't for both( we could have mixed situations in tests)
279       isMetaRecovery = true;
280     }
281     removeRecoveringRegions(serverNames, isMetaRecovery);
282 
283     if (batch.done != batch.installed) {
284       batch.isDead = true;
285       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
286       LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
287           + " but only " + batch.done + " done");
288       String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
289       status.abort(msg);
290       throw new IOException(msg);
291     }
292     for (Path logDir : logDirs) {
293       status.setStatus("Cleaning up log directory...");
294       final FileSystem fs = logDir.getFileSystem(conf);
295       try {
296         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
297           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
298         }
299       } catch (IOException ioe) {
300         FileStatus[] files = fs.listStatus(logDir);
301         if (files != null && files.length > 0) {
302           LOG.warn("returning success without actually splitting and "
303               + "deleting all the log files in path " + logDir);
304         } else {
305           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
306         }
307       }
308       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
309     }
310     String msg =
311         "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
312             + " log files in " + logDirs + " in "
313             + (EnvironmentEdgeManager.currentTime() - t) + "ms";
314     status.markComplete(msg);
315     LOG.info(msg);
316     return totalSize;
317   }
318 
319   /**
320    * Add a task entry to coordination if it is not already there.
321    * @param taskname the path of the log to be split
322    * @param batch the batch this task belongs to
323    * @return true if a new entry is created, false if it is already there.
324    */
325   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
326     lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
327     String task =
328         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
329             .getSplitLogManagerCoordination().prepareTask(taskname);
330     Task oldtask = createTaskIfAbsent(task, batch);
331     if (oldtask == null) {
332       // publish the task in the coordination engine
333       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
334           .getSplitLogManagerCoordination().submitTask(task);
335       return true;
336     }
337     return false;
338   }
339 
340   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
341     synchronized (batch) {
342       while ((batch.done + batch.error) != batch.installed) {
343         try {
344           status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
345               + batch.installed + " done=" + batch.done + " error=" + batch.error);
346           int remaining = batch.installed - (batch.done + batch.error);
347           int actual = activeTasks(batch);
348           if (remaining != actual) {
349             LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
350           }
351           int remainingTasks =
352               ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
353                   .getSplitLogManagerCoordination().remainingTasksInCoordination();
354           if (remainingTasks >= 0 && actual > remainingTasks) {
355             LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
356                 + remainingTasks);
357           }
358           if (remainingTasks == 0 || actual == 0) {
359             LOG.warn("No more task remaining, splitting "
360                 + "should have completed. Remaining tasks is " + remainingTasks
361                 + ", active tasks in map " + actual);
362             if (remainingTasks == 0 && actual == 0) {
363               return;
364             }
365           }
366           batch.wait(100);
367           if (stopper.isStopped()) {
368             LOG.warn("Stopped while waiting for log splits to be completed");
369             return;
370           }
371         } catch (InterruptedException e) {
372           LOG.warn("Interrupted while waiting for log splits to be completed");
373           Thread.currentThread().interrupt();
374           return;
375         }
376       }
377     }
378   }
379 
380   @VisibleForTesting
381   ConcurrentMap<String, Task> getTasks() {
382     return tasks;
383   }
384 
385   private int activeTasks(final TaskBatch batch) {
386     int count = 0;
387     for (Task t : tasks.values()) {
388       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
389         count++;
390       }
391     }
392     return count;
393 
394   }
395 
396   /**
397    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
398    * region server hosting the region can allow reads to the recovered region
399    * @param serverNames servers which are just recovered
400    * @param isMetaRecovery whether current recovery is for the meta region on
401    *          <code>serverNames<code>
402    */
403   private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
404     if (!isLogReplaying()) {
405       // the function is only used in WALEdit direct replay mode
406       return;
407     }
408 
409     Set<String> recoveredServerNameSet = new HashSet<String>();
410     if (serverNames != null) {
411       for (ServerName tmpServerName : serverNames) {
412         recoveredServerNameSet.add(tmpServerName.getServerName());
413       }
414     }
415 
416     try {
417       this.recoveringRegionLock.lock();
418       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
419           .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
420             isMetaRecovery);
421     } catch (IOException e) {
422       LOG.warn("removeRecoveringRegions got exception. Will retry", e);
423       if (serverNames != null && !serverNames.isEmpty()) {
424         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
425             isMetaRecovery));
426       }
427     } finally {
428       this.recoveringRegionLock.unlock();
429     }
430   }
431 
432   /**
433    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
434    * during master initialization phase.
435    * @param failedServers A set of known failed servers
436    * @throws IOException
437    */
438   void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
439       InterruptedIOException {
440     Set<String> knownFailedServers = new HashSet<String>();
441     if (failedServers != null) {
442       for (ServerName tmpServerName : failedServers) {
443         knownFailedServers.add(tmpServerName.getServerName());
444       }
445     }
446 
447     this.recoveringRegionLock.lock();
448     try {
449       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
450           .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
451     } finally {
452       this.recoveringRegionLock.unlock();
453     }
454   }
455 
456   /**
457    * @param path
458    * @param batch
459    * @return null on success, existing task on error
460    */
461   private Task createTaskIfAbsent(String path, TaskBatch batch) {
462     Task oldtask;
463     // batch.installed is only changed via this function and
464     // a single thread touches batch.installed.
465     Task newtask = new Task();
466     newtask.batch = batch;
467     oldtask = tasks.putIfAbsent(path, newtask);
468     if (oldtask == null) {
469       batch.installed++;
470       return null;
471     }
472     // new task was not used.
473     synchronized (oldtask) {
474       if (oldtask.isOrphan()) {
475         if (oldtask.status == SUCCESS) {
476           // The task is already done. Do not install the batch for this
477           // task because it might be too late for setDone() to update
478           // batch.done. There is no need for the batch creator to wait for
479           // this task to complete.
480           return (null);
481         }
482         if (oldtask.status == IN_PROGRESS) {
483           oldtask.batch = batch;
484           batch.installed++;
485           LOG.debug("Previously orphan task " + path + " is now being waited upon");
486           return null;
487         }
488         while (oldtask.status == FAILURE) {
489           LOG.debug("wait for status of task " + path + " to change to DELETED");
490           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
491           try {
492             oldtask.wait();
493           } catch (InterruptedException e) {
494             Thread.currentThread().interrupt();
495             LOG.warn("Interrupted when waiting for znode delete callback");
496             // fall through to return failure
497             break;
498           }
499         }
500         if (oldtask.status != DELETED) {
501           LOG.warn("Failure because previously failed task"
502               + " state still present. Waiting for znode delete callback" + " path=" + path);
503           return oldtask;
504         }
505         // reinsert the newTask and it must succeed this time
506         Task t = tasks.putIfAbsent(path, newtask);
507         if (t == null) {
508           batch.installed++;
509           return null;
510         }
511         LOG.fatal("Logic error. Deleted task still present in tasks map");
512         assert false : "Deleted task still present in tasks map";
513         return t;
514       }
515       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
516       return oldtask;
517     }
518   }
519 
520   Task findOrCreateOrphanTask(String path) {
521     Task orphanTask = new Task();
522     Task task;
523     task = tasks.putIfAbsent(path, orphanTask);
524     if (task == null) {
525       LOG.info("creating orphan task " + path);
526       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
527       task = orphanTask;
528     }
529     return task;
530   }
531 
532   public void stop() {
533     if (choreService != null) {
534       choreService.shutdown();
535     }
536     if (timeoutMonitor != null) {
537       timeoutMonitor.cancel(true);
538     }
539   }
540 
541   void handleDeadWorker(ServerName workerName) {
542     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
543     // to reason about concurrency. Makes it easier to retry.
544     synchronized (deadWorkersLock) {
545       if (deadWorkers == null) {
546         deadWorkers = new HashSet<ServerName>(100);
547       }
548       deadWorkers.add(workerName);
549     }
550     LOG.info("dead splitlog worker " + workerName);
551   }
552 
553   void handleDeadWorkers(Set<ServerName> serverNames) {
554     synchronized (deadWorkersLock) {
555       if (deadWorkers == null) {
556         deadWorkers = new HashSet<ServerName>(100);
557       }
558       deadWorkers.addAll(serverNames);
559     }
560     LOG.info("dead splitlog workers " + serverNames);
561   }
562 
563   /**
564    * This function is to set recovery mode from outstanding split log tasks from before or current
565    * configuration setting
566    * @param isForInitialization
567    * @throws IOException throws if it's impossible to set recovery mode
568    */
569   public void setRecoveryMode(boolean isForInitialization) throws IOException {
570     ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
571         .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
572 
573   }
574 
575   public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
576       throws InterruptedIOException, IOException {
577     if (userRegions == null || (!isLogReplaying())) {
578       return;
579     }
580     try {
581       this.recoveringRegionLock.lock();
582       // mark that we're creating recovering regions
583       ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
584           .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
585     } finally {
586       this.recoveringRegionLock.unlock();
587     }
588 
589   }
590 
591   /**
592    * @return whether log is replaying
593    */
594   public boolean isLogReplaying() {
595     if (server.getCoordinatedStateManager() == null) return false;
596     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
597         .getSplitLogManagerCoordination().isReplaying();
598   }
599 
600   /**
601    * @return whether log is splitting
602    */
603   public boolean isLogSplitting() {
604     if (server.getCoordinatedStateManager() == null) return false;
605     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
606         .getSplitLogManagerCoordination().isSplitting();
607   }
608 
609   /**
610    * @return the current log recovery mode
611    */
612   public RecoveryMode getRecoveryMode() {
613     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
614         .getSplitLogManagerCoordination().getRecoveryMode();
615   }
616 
617   /**
618    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
619    * Clients threads use this object to wait for all their tasks to be done.
620    * <p>
621    * All access is synchronized.
622    */
623   @InterfaceAudience.Private
624   public static class TaskBatch {
625     public int installed = 0;
626     public int done = 0;
627     public int error = 0;
628     public volatile boolean isDead = false;
629 
630     @Override
631     public String toString() {
632       return ("installed = " + installed + " done = " + done + " error = " + error);
633     }
634   }
635 
636   /**
637    * in memory state of an active task.
638    */
639   @InterfaceAudience.Private
640   public static class Task {
641     public volatile long last_update;
642     public volatile int last_version;
643     public volatile ServerName cur_worker_name;
644     public volatile TaskBatch batch;
645     public volatile TerminationStatus status;
646     public volatile int incarnation;
647     public final AtomicInteger unforcedResubmits = new AtomicInteger();
648     public volatile boolean resubmitThresholdReached;
649 
650     @Override
651     public String toString() {
652       return ("last_update = " + last_update + " last_version = " + last_version
653           + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
654           + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
655     }
656 
657     public Task() {
658       incarnation = 0;
659       last_version = -1;
660       status = IN_PROGRESS;
661       setUnassigned();
662     }
663 
664     public boolean isOrphan() {
665       return (batch == null || batch.isDead);
666     }
667 
668     public boolean isUnassigned() {
669       return (cur_worker_name == null);
670     }
671 
672     public void heartbeatNoDetails(long time) {
673       last_update = time;
674     }
675 
676     public void heartbeat(long time, int version, ServerName worker) {
677       last_version = version;
678       last_update = time;
679       cur_worker_name = worker;
680     }
681 
682     public void setUnassigned() {
683       cur_worker_name = null;
684       last_update = -1;
685     }
686   }
687 
688   /**
689    * Periodically checks all active tasks and resubmits the ones that have timed out
690    */
691   private class TimeoutMonitor extends ScheduledChore {
692     private long lastLog = 0;
693 
694     public TimeoutMonitor(final int period, Stoppable stopper) {
695       super("SplitLogManager Timeout Monitor", stopper, period);
696     }
697 
698     @Override
699     protected void chore() {
700       int resubmitted = 0;
701       int unassigned = 0;
702       int tot = 0;
703       boolean found_assigned_task = false;
704       Set<ServerName> localDeadWorkers;
705 
706       synchronized (deadWorkersLock) {
707         localDeadWorkers = deadWorkers;
708         deadWorkers = null;
709       }
710 
711       for (Map.Entry<String, Task> e : tasks.entrySet()) {
712         String path = e.getKey();
713         Task task = e.getValue();
714         ServerName cur_worker = task.cur_worker_name;
715         tot++;
716         // don't easily resubmit a task which hasn't been picked up yet. It
717         // might be a long while before a SplitLogWorker is free to pick up a
718         // task. This is because a SplitLogWorker picks up a task one at a
719         // time. If we want progress when there are no region servers then we
720         // will have to run a SplitLogWorker thread in the Master.
721         if (task.isUnassigned()) {
722           unassigned++;
723           continue;
724         }
725         found_assigned_task = true;
726         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
727           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
728           if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
729               .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
730             resubmitted++;
731           } else {
732             handleDeadWorker(cur_worker);
733             LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
734                 + ", will retry.");
735           }
736         } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
737             .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
738           resubmitted++;
739         }
740       }
741       if (tot > 0) {
742         long now = EnvironmentEdgeManager.currentTime();
743         if (now > lastLog + 5000) {
744           lastLog = now;
745           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
746         }
747       }
748       if (resubmitted > 0) {
749         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
750       }
751       // If there are pending tasks and all of them have been unassigned for
752       // some time then put up a RESCAN node to ping the workers.
753       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
754       // because a. it is very unlikely that every worker had a
755       // transient error when trying to grab the task b. if there are no
756       // workers then all tasks wills stay unassigned indefinitely and the
757       // manager will be indefinitely creating RESCAN nodes. TODO may be the
758       // master should spawn both a manager and a worker thread to guarantee
759       // that there is always one worker in the system
760       if (tot > 0
761           && !found_assigned_task
762           && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
763         for (Map.Entry<String, Task> e : tasks.entrySet()) {
764           String key = e.getKey();
765           Task task = e.getValue();
766           // we have to do task.isUnassigned() check again because tasks might
767           // have been asynchronously assigned. There is no locking required
768           // for these checks ... it is OK even if tryGetDataSetWatch() is
769           // called unnecessarily for a taskpath
770           if (task.isUnassigned() && (task.status != FAILURE)) {
771             // We just touch the znode to make sure its still there
772             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
773                 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
774           }
775         }
776         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
777             .getSplitLogManagerCoordination().checkTasks();
778         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
779         LOG.debug("resubmitting unassigned task(s) after timeout");
780       }
781       Set<String> failedDeletions =
782           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
783               .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
784       // Retry previously failed deletes
785       if (failedDeletions.size() > 0) {
786         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
787         for (String tmpPath : tmpPaths) {
788           // deleteNode is an async call
789           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
790               .getSplitLogManagerCoordination().deleteTask(tmpPath);
791         }
792         failedDeletions.removeAll(tmpPaths);
793       }
794 
795       // Garbage collect left-over
796       long timeInterval =
797           EnvironmentEdgeManager.currentTime()
798               - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
799                   .getSplitLogManagerCoordination().getLastRecoveryTime();
800       if (!failedRecoveringRegionDeletions.isEmpty()
801           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
802         // inside the function there have more checks before GC anything
803         if (!failedRecoveringRegionDeletions.isEmpty()) {
804           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
805               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
806           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
807           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
808             removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
809           }
810         } else {
811           removeRecoveringRegions(null, null);
812         }
813       }
814     }
815   }
816 
817   public enum ResubmitDirective {
818     CHECK(), FORCE();
819   }
820 
821   public enum TerminationStatus {
822     IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
823 
824     String statusMsg;
825 
826     TerminationStatus(String msg) {
827       statusMsg = msg;
828     }
829 
830     @Override
831     public String toString() {
832       return statusMsg;
833     }
834   }
835 }