View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.hbase.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.CoordinatedStateManager;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.Server;
52  import org.apache.hadoop.hbase.ServerName;
53  import org.apache.hadoop.hbase.SplitLogCounters;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
56  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
57  import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails;
58  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
61  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
62  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
63  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64  import org.apache.hadoop.hbase.util.FSUtils;
65  import org.apache.hadoop.hbase.util.Pair;
66  import org.apache.hadoop.hbase.util.Threads;
67  
68  import com.google.common.annotations.VisibleForTesting;
69  
70  /**
71   * Distributes the task of log splitting to the available region servers.
72   * Coordination happens via coordination engine. For every log file that has to be split a
73   * task is created. SplitLogWorkers race to grab a task.
74   *
75   * <p>SplitLogManager monitors the tasks that it creates using the
76   * timeoutMonitor thread. If a task's progress is slow then
77   * {@link SplitLogManagerCoordination#checkTasks} will take away the
78   * task from the owner {@link SplitLogWorker} and the task will be up for grabs again. When the
79   * task is done then it is deleted by SplitLogManager.
80   *
81   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
82   * log files. The caller thread waits in this method until all the log files
83   * have been split.
84   *
85   * <p>All the coordination calls made by this class are asynchronous. This is mainly
86   * to help reduce response time seen by the callers.
87   *
88   * <p>There is race in this design between the SplitLogManager and the
89   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
90   * already been completed by a SplitLogWorker. We rely on the idempotency of
91   * the log splitting task for correctness.
92   *
93   * <p>It is also assumed that every log splitting task is unique and once
94   * completed (either with success or with error) it will be not be submitted
95   * again. If a task is resubmitted then there is a risk that old "delete task"
96   * can delete the re-submission.
97   */
98  @InterfaceAudience.Private
99  public class SplitLogManager {
100   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
101 
102   private Server server;
103 
104   private final Stoppable stopper;
105   private FileSystem fs;
106   private Configuration conf;
107 
108   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); // 3 min
109 
110   private long unassignedTimeout;
111   private long lastTaskCreateTime = Long.MAX_VALUE;
112   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
113   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
114       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
115 
116   /**
117    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
118    * operation. So the lock is used to guard such cases.
119    */
120   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
121 
122   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
123   private TimeoutMonitor timeoutMonitor;
124 
125   private volatile Set<ServerName> deadWorkers = null;
126   private final Object deadWorkersLock = new Object();
127 
128   /**
129    * Its OK to construct this object even when region-servers are not online. It does lookup the
130    * orphan tasks in coordination engine but it doesn't block waiting for them to be done.
131    * @param server the server instance
132    * @param conf the HBase configuration
133    * @param stopper the stoppable in case anything is wrong
134    * @param master the master services
135    * @param serverName the master server name
136    * @throws IOException
137    */
138   public SplitLogManager(Server server, Configuration conf, Stoppable stopper,
139       MasterServices master, ServerName serverName) throws IOException {
140     this.server = server;
141     this.conf = conf;
142     this.stopper = stopper;
143     if (server.getCoordinatedStateManager() != null) {
144       SplitLogManagerCoordination coordination =
145           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
146               .getSplitLogManagerCoordination();
147       Set<String> failedDeletions = Collections.synchronizedSet(new HashSet<String>());
148       SplitLogManagerDetails details =
149           new SplitLogManagerDetails(tasks, master, failedDeletions, serverName);
150       coordination.init();
151       coordination.setDetails(details);
152       // Determine recovery mode
153     }
154     this.unassignedTimeout =
155         conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
156     this.timeoutMonitor =
157         new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000),
158             stopper);
159     Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
160         + ".splitLogManagerTimeoutMonitor");
161   }
162 
163   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
164     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
165     for (Path hLogDir : logDirs) {
166       this.fs = hLogDir.getFileSystem(conf);
167       if (!fs.exists(hLogDir)) {
168         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
169         continue;
170       }
171       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
172       if (logfiles == null || logfiles.length == 0) {
173         LOG.info(hLogDir + " is empty dir, no logs to split");
174       } else {
175         Collections.addAll(fileStatus, logfiles);
176       }
177     }
178     FileStatus[] a = new FileStatus[fileStatus.size()];
179     return fileStatus.toArray(a);
180   }
181 
182   /**
183    * @param logDir one region sever hlog dir path in .logs
184    * @throws IOException if there was an error while splitting any log file
185    * @return cumulative size of the logfiles split
186    * @throws IOException
187    */
188   public long splitLogDistributed(final Path logDir) throws IOException {
189     List<Path> logDirs = new ArrayList<Path>();
190     logDirs.add(logDir);
191     return splitLogDistributed(logDirs);
192   }
193 
194   /**
195    * The caller will block until all the log files of the given region server have been processed -
196    * successfully split or an error is encountered - by an available worker region server. This
197    * method must only be called after the region servers have been brought online.
198    * @param logDirs List of log dirs to split
199    * @throws IOException If there was an error while splitting any log file
200    * @return cumulative size of the logfiles split
201    */
202   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
203     if (logDirs.isEmpty()) {
204       return 0;
205     }
206     Set<ServerName> serverNames = new HashSet<ServerName>();
207     for (Path logDir : logDirs) {
208       try {
209         ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
210         if (serverName != null) {
211           serverNames.add(serverName);
212         }
213       } catch (IllegalArgumentException e) {
214         // ignore invalid format error.
215         LOG.warn("Cannot parse server name from " + logDir);
216       }
217     }
218     return splitLogDistributed(serverNames, logDirs, null);
219   }
220 
221   /**
222    * The caller will block until all the hbase:meta log files of the given region server have been
223    * processed - successfully split or an error is encountered - by an available worker region
224    * server. This method must only be called after the region servers have been brought online.
225    * @param logDirs List of log dirs to split
226    * @param filter the Path filter to select specific files for considering
227    * @throws IOException If there was an error while splitting any log file
228    * @return cumulative size of the logfiles split
229    */
230   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
231       PathFilter filter) throws IOException {
232     MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
233       logDirs + " for serverName=" + serverNames);
234     FileStatus[] logfiles = getFileList(logDirs, filter);
235     status.setStatus("Checking directory contents...");
236     LOG.debug("Scheduling batch of logs to split");
237     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
238     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs +
239       " for " + serverNames);
240     long t = EnvironmentEdgeManager.currentTime();
241     long totalSize = 0;
242     TaskBatch batch = new TaskBatch();
243     Boolean isMetaRecovery = (filter == null) ? null : false;
244     for (FileStatus lf : logfiles) {
245       // TODO If the log file is still being written to - which is most likely
246       // the case for the last log file - then its length will show up here
247       // as zero. The size of such a file can only be retrieved after
248       // recover-lease is done. totalSize will be under in most cases and the
249       // metrics that it drives will also be under-reported.
250       totalSize += lf.getLen();
251       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
252       if (!enqueueSplitTask(pathToLog, batch)) {
253         throw new IOException("duplicate log split scheduled for " + lf.getPath());
254       }
255     }
256     waitForSplittingCompletion(batch, status);
257     // remove recovering regions
258     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
259       // we split meta regions and user regions separately therefore logfiles are either all for
260       // meta or user regions but won't for both( we could have mixed situations in tests)
261       isMetaRecovery = true;
262     }
263     removeRecoveringRegions(serverNames, isMetaRecovery);
264 
265     if (batch.done != batch.installed) {
266       batch.isDead = true;
267       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
268       LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
269           + " but only " + batch.done + " done");
270       String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
271       status.abort(msg);
272       throw new IOException(msg);
273     }
274     for (Path logDir : logDirs) {
275       status.setStatus("Cleaning up log directory...");
276       try {
277         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
278           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
279         }
280       } catch (IOException ioe) {
281         FileStatus[] files = fs.listStatus(logDir);
282         if (files != null && files.length > 0) {
283           LOG.warn("returning success without actually splitting and "
284               + "deleting all the log files in path " + logDir);
285         } else {
286           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
287         }
288       }
289       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
290     }
291     String msg =
292         "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
293             + " log files in " + logDirs + " in "
294             + (EnvironmentEdgeManager.currentTime() - t) + "ms";
295     status.markComplete(msg);
296     LOG.info(msg);
297     return totalSize;
298   }
299 
300   /**
301    * Add a task entry to coordination if it is not already there.
302    * @param taskname the path of the log to be split
303    * @param batch the batch this task belongs to
304    * @return true if a new entry is created, false if it is already there.
305    */
306   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
307     lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
308     String task =
309         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
310             .getSplitLogManagerCoordination().prepareTask(taskname);
311     Task oldtask = createTaskIfAbsent(task, batch);
312     if (oldtask == null) {
313       // publish the task in the coordination engine
314       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
315           .getSplitLogManagerCoordination().submitTask(task);
316       return true;
317     }
318     return false;
319   }
320 
321   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
322     synchronized (batch) {
323       while ((batch.done + batch.error) != batch.installed) {
324         try {
325           status.setStatus("Waiting for distributed tasks to finish. " + " scheduled="
326               + batch.installed + " done=" + batch.done + " error=" + batch.error);
327           int remaining = batch.installed - (batch.done + batch.error);
328           int actual = activeTasks(batch);
329           if (remaining != actual) {
330             LOG.warn("Expected " + remaining + " active tasks, but actually there are " + actual);
331           }
332           int remainingTasks =
333               ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
334                   .getSplitLogManagerCoordination().remainingTasksInCoordination();
335           if (remainingTasks >= 0 && actual > remainingTasks) {
336             LOG.warn("Expected at least" + actual + " tasks remaining, but actually there are "
337                 + remainingTasks);
338           }
339           if (remainingTasks == 0 || actual == 0) {
340             LOG.warn("No more task remaining, splitting "
341                 + "should have completed. Remaining tasks is " + remainingTasks
342                 + ", active tasks in map " + actual);
343             if (remainingTasks == 0 && actual == 0) {
344               return;
345             }
346           }
347           batch.wait(100);
348           if (stopper.isStopped()) {
349             LOG.warn("Stopped while waiting for log splits to be completed");
350             return;
351           }
352         } catch (InterruptedException e) {
353           LOG.warn("Interrupted while waiting for log splits to be completed");
354           Thread.currentThread().interrupt();
355           return;
356         }
357       }
358     }
359   }
360 
361   @VisibleForTesting
362   ConcurrentMap<String, Task> getTasks() {
363     return tasks;
364   }
365 
366   private int activeTasks(final TaskBatch batch) {
367     int count = 0;
368     for (Task t : tasks.values()) {
369       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
370         count++;
371       }
372     }
373     return count;
374 
375   }
376 
377   /**
378    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
379    * region server hosting the region can allow reads to the recovered region
380    * @param serverNames servers which are just recovered
381    * @param isMetaRecovery whether current recovery is for the meta region on
382    *          <code>serverNames<code>
383    */
384   private void removeRecoveringRegions(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
385     if (!isLogReplaying()) {
386       // the function is only used in WALEdit direct replay mode
387       return;
388     }
389 
390     Set<String> recoveredServerNameSet = new HashSet<String>();
391     if (serverNames != null) {
392       for (ServerName tmpServerName : serverNames) {
393         recoveredServerNameSet.add(tmpServerName.getServerName());
394       }
395     }
396 
397     try {
398       this.recoveringRegionLock.lock();
399       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
400           .getSplitLogManagerCoordination().removeRecoveringRegions(recoveredServerNameSet,
401             isMetaRecovery);
402     } catch (IOException e) {
403       LOG.warn("removeRecoveringRegions got exception. Will retry", e);
404       if (serverNames != null && !serverNames.isEmpty()) {
405         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
406             isMetaRecovery));
407       }
408     } finally {
409       this.recoveringRegionLock.unlock();
410     }
411   }
412 
413   /**
414    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
415    * during master initialization phase.
416    * @param failedServers A set of known failed servers
417    * @throws IOException
418    */
419   void removeStaleRecoveringRegions(final Set<ServerName> failedServers) throws IOException,
420       InterruptedIOException {
421     Set<String> knownFailedServers = new HashSet<String>();
422     if (failedServers != null) {
423       for (ServerName tmpServerName : failedServers) {
424         knownFailedServers.add(tmpServerName.getServerName());
425       }
426     }
427 
428     this.recoveringRegionLock.lock();
429     try {
430       ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
431           .getSplitLogManagerCoordination().removeStaleRecoveringRegions(knownFailedServers);
432     } finally {
433       this.recoveringRegionLock.unlock();
434     }
435   }
436 
437   /**
438    * @param path
439    * @param batch
440    * @return null on success, existing task on error
441    */
442   private Task createTaskIfAbsent(String path, TaskBatch batch) {
443     Task oldtask;
444     // batch.installed is only changed via this function and
445     // a single thread touches batch.installed.
446     Task newtask = new Task();
447     newtask.batch = batch;
448     oldtask = tasks.putIfAbsent(path, newtask);
449     if (oldtask == null) {
450       batch.installed++;
451       return null;
452     }
453     // new task was not used.
454     synchronized (oldtask) {
455       if (oldtask.isOrphan()) {
456         if (oldtask.status == SUCCESS) {
457           // The task is already done. Do not install the batch for this
458           // task because it might be too late for setDone() to update
459           // batch.done. There is no need for the batch creator to wait for
460           // this task to complete.
461           return (null);
462         }
463         if (oldtask.status == IN_PROGRESS) {
464           oldtask.batch = batch;
465           batch.installed++;
466           LOG.debug("Previously orphan task " + path + " is now being waited upon");
467           return null;
468         }
469         while (oldtask.status == FAILURE) {
470           LOG.debug("wait for status of task " + path + " to change to DELETED");
471           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
472           try {
473             oldtask.wait();
474           } catch (InterruptedException e) {
475             Thread.currentThread().interrupt();
476             LOG.warn("Interrupted when waiting for znode delete callback");
477             // fall through to return failure
478             break;
479           }
480         }
481         if (oldtask.status != DELETED) {
482           LOG.warn("Failure because previously failed task"
483               + " state still present. Waiting for znode delete callback" + " path=" + path);
484           return oldtask;
485         }
486         // reinsert the newTask and it must succeed this time
487         Task t = tasks.putIfAbsent(path, newtask);
488         if (t == null) {
489           batch.installed++;
490           return null;
491         }
492         LOG.fatal("Logic error. Deleted task still present in tasks map");
493         assert false : "Deleted task still present in tasks map";
494         return t;
495       }
496       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
497       return oldtask;
498     }
499   }
500 
501   Task findOrCreateOrphanTask(String path) {
502     Task orphanTask = new Task();
503     Task task;
504     task = tasks.putIfAbsent(path, orphanTask);
505     if (task == null) {
506       LOG.info("creating orphan task " + path);
507       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
508       task = orphanTask;
509     }
510     return task;
511   }
512 
513   public void stop() {
514     if (timeoutMonitor != null) {
515       timeoutMonitor.interrupt();
516     }
517   }
518 
519   void handleDeadWorker(ServerName workerName) {
520     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
521     // to reason about concurrency. Makes it easier to retry.
522     synchronized (deadWorkersLock) {
523       if (deadWorkers == null) {
524         deadWorkers = new HashSet<ServerName>(100);
525       }
526       deadWorkers.add(workerName);
527     }
528     LOG.info("dead splitlog worker " + workerName);
529   }
530 
531   void handleDeadWorkers(Set<ServerName> serverNames) {
532     synchronized (deadWorkersLock) {
533       if (deadWorkers == null) {
534         deadWorkers = new HashSet<ServerName>(100);
535       }
536       deadWorkers.addAll(serverNames);
537     }
538     LOG.info("dead splitlog workers " + serverNames);
539   }
540 
541   /**
542    * This function is to set recovery mode from outstanding split log tasks from before or current
543    * configuration setting
544    * @param isForInitialization
545    * @throws IOException throws if it's impossible to set recovery mode
546    */
547   public void setRecoveryMode(boolean isForInitialization) throws IOException {
548     ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
549         .getSplitLogManagerCoordination().setRecoveryMode(isForInitialization);
550 
551   }
552 
553   public void markRegionsRecovering(ServerName server, Set<HRegionInfo> userRegions)
554       throws InterruptedIOException, IOException {
555     if (userRegions == null || (!isLogReplaying())) {
556       return;
557     }
558     try {
559       this.recoveringRegionLock.lock();
560       // mark that we're creating recovering regions
561       ((BaseCoordinatedStateManager) this.server.getCoordinatedStateManager())
562           .getSplitLogManagerCoordination().markRegionsRecovering(server, userRegions);
563     } finally {
564       this.recoveringRegionLock.unlock();
565     }
566 
567   }
568 
569   /**
570    * @return whether log is replaying
571    */
572   public boolean isLogReplaying() {
573     CoordinatedStateManager m = server.getCoordinatedStateManager();
574     if (m == null) return false;
575     return ((BaseCoordinatedStateManager)m).getSplitLogManagerCoordination().isReplaying();
576   }
577 
578   /**
579    * @return whether log is splitting
580    */
581   public boolean isLogSplitting() {
582     if (server.getCoordinatedStateManager() == null) return false;
583     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
584         .getSplitLogManagerCoordination().isSplitting();
585   }
586 
587   /**
588    * @return the current log recovery mode
589    */
590   public RecoveryMode getRecoveryMode() {
591     return ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
592         .getSplitLogManagerCoordination().getRecoveryMode();
593   }
594 
595   /**
596    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
597    * Clients threads use this object to wait for all their tasks to be done.
598    * <p>
599    * All access is synchronized.
600    */
601   @InterfaceAudience.Private
602   public static class TaskBatch {
603     public int installed = 0;
604     public int done = 0;
605     public int error = 0;
606     public volatile boolean isDead = false;
607 
608     @Override
609     public String toString() {
610       return ("installed = " + installed + " done = " + done + " error = " + error);
611     }
612   }
613 
614   /**
615    * in memory state of an active task.
616    */
617   @InterfaceAudience.Private
618   public static class Task {
619     public volatile long last_update;
620     public volatile int last_version;
621     public volatile ServerName cur_worker_name;
622     public volatile TaskBatch batch;
623     public volatile TerminationStatus status;
624     public volatile int incarnation;
625     public final AtomicInteger unforcedResubmits = new AtomicInteger();
626     public volatile boolean resubmitThresholdReached;
627 
628     @Override
629     public String toString() {
630       return ("last_update = " + last_update + " last_version = " + last_version
631           + " cur_worker_name = " + cur_worker_name + " status = " + status + " incarnation = "
632           + incarnation + " resubmits = " + unforcedResubmits.get() + " batch = " + batch);
633     }
634 
635     public Task() {
636       incarnation = 0;
637       last_version = -1;
638       status = IN_PROGRESS;
639       setUnassigned();
640     }
641 
642     public boolean isOrphan() {
643       return (batch == null || batch.isDead);
644     }
645 
646     public boolean isUnassigned() {
647       return (cur_worker_name == null);
648     }
649 
650     public void heartbeatNoDetails(long time) {
651       last_update = time;
652     }
653 
654     public void heartbeat(long time, int version, ServerName worker) {
655       last_version = version;
656       last_update = time;
657       cur_worker_name = worker;
658     }
659 
660     public void setUnassigned() {
661       cur_worker_name = null;
662       last_update = -1;
663     }
664   }
665 
666   /**
667    * Periodically checks all active tasks and resubmits the ones that have timed out
668    */
669   private class TimeoutMonitor extends Chore {
670     private long lastLog = 0;
671 
672     public TimeoutMonitor(final int period, Stoppable stopper) {
673       super("SplitLogManager Timeout Monitor", period, stopper);
674     }
675 
676     @Override
677     protected void chore() {
678       int resubmitted = 0;
679       int unassigned = 0;
680       int tot = 0;
681       boolean found_assigned_task = false;
682       Set<ServerName> localDeadWorkers;
683 
684       synchronized (deadWorkersLock) {
685         localDeadWorkers = deadWorkers;
686         deadWorkers = null;
687       }
688 
689       for (Map.Entry<String, Task> e : tasks.entrySet()) {
690         String path = e.getKey();
691         Task task = e.getValue();
692         ServerName cur_worker = task.cur_worker_name;
693         tot++;
694         // don't easily resubmit a task which hasn't been picked up yet. It
695         // might be a long while before a SplitLogWorker is free to pick up a
696         // task. This is because a SplitLogWorker picks up a task one at a
697         // time. If we want progress when there are no region servers then we
698         // will have to run a SplitLogWorker thread in the Master.
699         if (task.isUnassigned()) {
700           unassigned++;
701           continue;
702         }
703         found_assigned_task = true;
704         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
705           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
706           if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
707               .getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
708             resubmitted++;
709           } else {
710             handleDeadWorker(cur_worker);
711             LOG.warn("Failed to resubmit task " + path + " owned by dead " + cur_worker
712                 + ", will retry.");
713           }
714         } else if (((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
715             .getSplitLogManagerCoordination().resubmitTask(path, task, CHECK)) {
716           resubmitted++;
717         }
718       }
719       if (tot > 0) {
720         long now = EnvironmentEdgeManager.currentTime();
721         if (now > lastLog + 5000) {
722           lastLog = now;
723           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
724         }
725       }
726       if (resubmitted > 0) {
727         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
728       }
729       // If there are pending tasks and all of them have been unassigned for
730       // some time then put up a RESCAN node to ping the workers.
731       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
732       // because a. it is very unlikely that every worker had a
733       // transient error when trying to grab the task b. if there are no
734       // workers then all tasks wills stay unassigned indefinitely and the
735       // manager will be indefinitely creating RESCAN nodes. TODO may be the
736       // master should spawn both a manager and a worker thread to guarantee
737       // that there is always one worker in the system
738       if (tot > 0
739           && !found_assigned_task
740           && ((EnvironmentEdgeManager.currentTime() - lastTaskCreateTime) > unassignedTimeout)) {
741         for (Map.Entry<String, Task> e : tasks.entrySet()) {
742           String key = e.getKey();
743           Task task = e.getValue();
744           // we have to do task.isUnassigned() check again because tasks might
745           // have been asynchronously assigned. There is no locking required
746           // for these checks ... it is OK even if tryGetDataSetWatch() is
747           // called unnecessarily for a taskpath
748           if (task.isUnassigned() && (task.status != FAILURE)) {
749             // We just touch the znode to make sure its still there
750             ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
751                 .getSplitLogManagerCoordination().checkTaskStillAvailable(key);
752           }
753         }
754         ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
755             .getSplitLogManagerCoordination().checkTasks();
756         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
757         LOG.debug("resubmitting unassigned task(s) after timeout");
758       }
759       Set<String> failedDeletions =
760           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
761               .getSplitLogManagerCoordination().getDetails().getFailedDeletions();
762       // Retry previously failed deletes
763       if (failedDeletions.size() > 0) {
764         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
765         for (String tmpPath : tmpPaths) {
766           // deleteNode is an async call
767           ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
768               .getSplitLogManagerCoordination().deleteTask(tmpPath);
769         }
770         failedDeletions.removeAll(tmpPaths);
771       }
772 
773       // Garbage collect left-over
774       long timeInterval =
775           EnvironmentEdgeManager.currentTime()
776               - ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
777                   .getSplitLogManagerCoordination().getLastRecoveryTime();
778       if (!failedRecoveringRegionDeletions.isEmpty()
779           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
780         // inside the function there have more checks before GC anything
781         if (!failedRecoveringRegionDeletions.isEmpty()) {
782           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
783               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
784           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
785           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
786             removeRecoveringRegions(failedDeletion.getFirst(), failedDeletion.getSecond());
787           }
788         } else {
789           removeRecoveringRegions(null, null);
790         }
791       }
792     }
793   }
794 
795   public enum ResubmitDirective {
796     CHECK(), FORCE();
797   }
798 
799   public enum TerminationStatus {
800     IN_PROGRESS("in_progress"), SUCCESS("success"), FAILURE("failure"), DELETED("deleted");
801 
802     String statusMsg;
803 
804     TerminationStatus(String msg) {
805       statusMsg = msg;
806     }
807 
808     @Override
809     public String toString() {
810       return statusMsg;
811     }
812   }
813 }