1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.HashSet;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.Set;
34  import java.util.concurrent.ConcurrentHashMap;
35  import java.util.concurrent.ConcurrentMap;
36  import java.util.concurrent.locks.ReentrantLock;
37  
38  import org.apache.commons.logging.Log;
39  import org.apache.commons.logging.LogFactory;
40  import org.apache.hadoop.classification.InterfaceAudience;
41  import org.apache.hadoop.conf.Configuration;
42  import org.apache.hadoop.fs.FileStatus;
43  import org.apache.hadoop.fs.FileSystem;
44  import org.apache.hadoop.fs.Path;
45  import org.apache.hadoop.fs.PathFilter;
46  import org.apache.hadoop.hbase.Chore;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.ServerName;
50  import org.apache.hadoop.hbase.SplitLogCounters;
51  import org.apache.hadoop.hbase.SplitLogTask;
52  import org.apache.hadoop.hbase.Stoppable;
53  import org.apache.hadoop.hbase.exceptions.DeserializationException;
54  import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
55  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
56  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
57  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
58  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
59  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
60  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
61  import org.apache.hadoop.hbase.util.FSUtils;
62  import org.apache.hadoop.hbase.util.Threads;
63  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
64  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
65  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
66  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
67  import org.apache.hadoop.util.StringUtils;
68  import org.apache.zookeeper.AsyncCallback;
69  import org.apache.zookeeper.CreateMode;
70  import org.apache.zookeeper.KeeperException;
71  import org.apache.zookeeper.KeeperException.NoNodeException;
72  import org.apache.zookeeper.ZooDefs.Ids;
73  import org.apache.zookeeper.data.Stat;
74  
75  /**
76   * Distributes the task of log splitting to the available region servers.
77   * Coordination happens via zookeeper. For every log file that has to be split a
78   * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race to grab a task.
79   *
80   * <p>SplitLogManager monitors the task znodes that it creates using the
81   * timeoutMonitor thread. If a task's progress is slow then
82   * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
83   * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the
84   * task's znode is deleted by SplitLogManager.
85   *
86   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
87   * log files. The caller thread waits in this method until all the log files
88   * have been split.
89   *
90   * <p>All the zookeeper calls made by this class are asynchronous. This is mainly
91   * to help reduce response time seen by the callers.
92   *
93   * <p>There is race in this design between the SplitLogManager and the
94   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
95   * already been completed by a SplitLogWorker. We rely on the idempotency of
96   * the log splitting task for correctness.
97   *
98   * <p>It is also assumed that every log splitting task is unique and once
99   * completed (either with success or with error) it will be not be submitted
100  * again. If a task is resubmitted then there is a risk that old "delete task"
101  * can delete the re-submission.
102  */
103 @InterfaceAudience.Private
104 public class SplitLogManager extends ZooKeeperListener {
105   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
106 
107   public static final int DEFAULT_TIMEOUT = 120000;
108   public static final int DEFAULT_ZK_RETRIES = 3;
109   public static final int DEFAULT_MAX_RESUBMIT = 3;
110   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
111 
112   private final Stoppable stopper;
113   private final MasterServices master;
114   private final ServerName serverName;
115   private final TaskFinisher taskFinisher;
116   private FileSystem fs;
117   private Configuration conf;
118 
119   private long zkretries;
120   private long resubmit_threshold;
121   private long timeout;
122   private long unassignedTimeout;
123   private long lastNodeCreateTime = Long.MAX_VALUE;
124   public boolean ignoreZKDeleteForTesting = false;
125   private volatile long lastRecoveringNodeCreationTime = Long.MAX_VALUE;
126   // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
127   // whether to GC stale recovering znodes
128   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
129   private final Set<ServerName> failedRecoveringRegionDeletions = Collections
130       .synchronizedSet(new HashSet<ServerName>());
131 
132   /**
133    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
134    * operation. So the lock is used to guard such cases.
135    */
136   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
137 
138   final boolean distributedLogReplay;
139 
140   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
141   private TimeoutMonitor timeoutMonitor;
142 
143   private volatile Set<ServerName> deadWorkers = null;
144   private final Object deadWorkersLock = new Object();
145 
146   private Set<String> failedDeletions = null;
147 
148   /**
149    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
150    *   Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)}
151    * that provides a task finisher for copying recovered edits to their final destination.
152    * The task finisher has to be robust because it can be arbitrarily restarted or called
153    * multiple times.
154    * 
155    * @param zkw
156    * @param conf
157    * @param stopper
158    * @param serverName
159    */
160   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
161        Stoppable stopper, MasterServices master, ServerName serverName) {
162     this(zkw, conf, stopper,  master, serverName, new TaskFinisher() {
163       @Override
164       public Status finish(ServerName workerName, String logfile) {
165         try {
166           HLogSplitter.finishSplitLogFile(logfile, conf);
167         } catch (IOException e) {
168           LOG.warn("Could not finish splitting of log file " + logfile, e);
169           return Status.ERR;
170         }
171         return Status.DONE;
172       }
173     });
174   }
175 
176   /**
177    * Its OK to construct this object even when region-servers are not online. It
178    * does lookup the orphan tasks in zk but it doesn't block waiting for them
179    * to be done.
180    *
181    * @param zkw
182    * @param conf
183    * @param stopper
184    * @param serverName
185    * @param tf task finisher 
186    */
187   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
188         Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf) {
189     super(zkw);
190     this.taskFinisher = tf;
191     this.conf = conf;
192     this.stopper = stopper;
193     this.master = master;
194     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
195     this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
196     this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
197     this.unassignedTimeout =
198       conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
199     LOG.info("timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout);
200 
201     this.serverName = serverName;
202     this.timeoutMonitor = new TimeoutMonitor(
203       conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
204 
205     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
206     this.distributedLogReplay = this.conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY, 
207       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
208     LOG.info("distributedLogReplay = " + this.distributedLogReplay);
209   }
210 
211   public void finishInitialization(boolean masterRecovery) {
212     if (!masterRecovery) {
213       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
214           + ".splitLogManagerTimeoutMonitor");
215     }
216     // Watcher can be null during tests with Mock'd servers.
217     if (this.watcher != null) {
218       this.watcher.registerListener(this);
219       lookForOrphans();
220     }
221   }
222 
223   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
224     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
225     for (Path hLogDir : logDirs) {
226       this.fs = hLogDir.getFileSystem(conf);
227       if (!fs.exists(hLogDir)) {
228         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
229         continue;
230       }
231       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
232       if (logfiles == null || logfiles.length == 0) {
233         LOG.info(hLogDir + " is empty dir, no logs to split");
234       } else {
235         for (FileStatus status : logfiles)
236           fileStatus.add(status);
237       }
238     }
239     FileStatus[] a = new FileStatus[fileStatus.size()];
240     return fileStatus.toArray(a);
241   }
242 
243   /**
244    * @param logDir
245    *            one region sever hlog dir path in .logs
246    * @throws IOException
247    *             if there was an error while splitting any log file
248    * @return cumulative size of the logfiles split
249    * @throws IOException 
250    */
251   public long splitLogDistributed(final Path logDir) throws IOException {
252     List<Path> logDirs = new ArrayList<Path>();
253     logDirs.add(logDir);
254     return splitLogDistributed(logDirs);
255   }
256 
257   /**
258    * The caller will block until all the log files of the given region server
259    * have been processed - successfully split or an error is encountered - by an
260    * available worker region server. This method must only be called after the
261    * region servers have been brought online.
262    *
263    * @param logDirs List of log dirs to split
264    * @throws IOException If there was an error while splitting any log file
265    * @return cumulative size of the logfiles split
266    */
267   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
268     if (logDirs.isEmpty()) {
269       return 0;
270     }
271     Set<ServerName> serverNames = new HashSet<ServerName>();
272     for (Path logDir : logDirs) {
273       try {
274         ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
275         if (serverName != null) {
276           serverNames.add(serverName);
277         }
278       } catch (IllegalArgumentException e) {
279         // ignore invalid format error.
280         LOG.warn("Cannot parse server name from " + logDir);
281       }
282     }
283     return splitLogDistributed(serverNames, logDirs, null);
284   }
285 
286   /**
287    * The caller will block until all the META log files of the given region server
288    * have been processed - successfully split or an error is encountered - by an
289    * available worker region server. This method must only be called after the
290    * region servers have been brought online.
291    *
292    * @param logDirs List of log dirs to split
293    * @param filter the Path filter to select specific files for considering
294    * @throws IOException If there was an error while splitting any log file
295    * @return cumulative size of the logfiles split
296    */
297   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
298       PathFilter filter) throws IOException {
299     MonitoredTask status = TaskMonitor.get().createStatus(
300           "Doing distributed log split in " + logDirs);
301     FileStatus[] logfiles = getFileList(logDirs, filter);
302     status.setStatus("Checking directory contents...");
303     LOG.debug("Scheduling batch of logs to split");
304     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
305     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
306     long t = EnvironmentEdgeManager.currentTimeMillis();
307     long totalSize = 0;
308     TaskBatch batch = new TaskBatch();
309     for (FileStatus lf : logfiles) {
310       // TODO If the log file is still being written to - which is most likely
311       // the case for the last log file - then its length will show up here
312       // as zero. The size of such a file can only be retrieved after
313       // recover-lease is done. totalSize will be under in most cases and the
314       // metrics that it drives will also be under-reported.
315       totalSize += lf.getLen();
316       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
317       if (!enqueueSplitTask(pathToLog, batch)) {
318         throw new IOException("duplicate log split scheduled for " + lf.getPath());
319       }
320     }
321     waitForSplittingCompletion(batch, status);
322     // remove recovering regions from ZK
323     this.removeRecoveringRegionsFromZK(serverNames);
324 
325     if (batch.done != batch.installed) {
326       batch.isDead = true;
327       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
328       LOG.warn("error while splitting logs in " + logDirs +
329       " installed = " + batch.installed + " but only " + batch.done + " done");
330       String msg = "error or interrupted while splitting logs in "
331         + logDirs + " Task = " + batch;
332       status.abort(msg);
333       throw new IOException(msg);
334     }
335     for(Path logDir: logDirs){
336       status.setStatus("Cleaning up log directory...");
337       try {
338         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
339           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
340         }
341       } catch (IOException ioe) {
342         FileStatus[] files = fs.listStatus(logDir);
343         if (files != null && files.length > 0) {
344           LOG.warn("returning success without actually splitting and " + 
345               "deleting all the log files in path " + logDir);
346         } else {
347           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
348         }
349       }
350       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
351     }
352     String msg = "finished splitting (more than or equal to) " + totalSize +
353         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
354         (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
355     status.markComplete(msg);
356     LOG.info(msg);
357     return totalSize;
358   }
359 
360   /**
361    * Add a task entry to splitlog znode if it is not already there.
362    * 
363    * @param taskname the path of the log to be split
364    * @param batch the batch this task belongs to
365    * @return true if a new entry is created, false if it is already there.
366    */
367   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
368     SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
369     // This is a znode path under the splitlog dir with the rest of the path made up of an
370     // url encoding of the passed in log to split.
371     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
372     Task oldtask = createTaskIfAbsent(path, batch);
373     if (oldtask == null) {
374       // publish the task in zk
375       createNode(path, zkretries);
376       return true;
377     }
378     return false;
379   }
380 
381   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
382     synchronized (batch) {
383       while ((batch.done + batch.error) != batch.installed) {
384         try {
385           status.setStatus("Waiting for distributed tasks to finish. "
386               + " scheduled=" + batch.installed
387               + " done=" + batch.done
388               + " error=" + batch.error);
389           int remaining = batch.installed - (batch.done + batch.error);
390           int actual = activeTasks(batch);
391           if (remaining != actual) {
392             LOG.warn("Expected " + remaining
393               + " active tasks, but actually there are " + actual);
394           }
395           int remainingInZK = remainingTasksInZK();
396           if (remainingInZK >= 0 && actual > remainingInZK) {
397             LOG.warn("Expected at least" + actual
398               + " tasks in ZK, but actually there are " + remainingInZK);
399           }
400           if (remainingInZK == 0 || actual == 0) {
401             LOG.warn("No more task remaining (ZK or task map), splitting "
402               + "should have completed. Remaining tasks in ZK " + remainingInZK
403               + ", active tasks in map " + actual);
404             if (remainingInZK == 0 && actual == 0) {
405               return;
406             }
407           }
408           batch.wait(100);
409           if (stopper.isStopped()) {
410             LOG.warn("Stopped while waiting for log splits to be completed");
411             return;
412           }
413         } catch (InterruptedException e) {
414           LOG.warn("Interrupted while waiting for log splits to be completed");
415           Thread.currentThread().interrupt();
416           return;
417         }
418       }
419     }
420   }
421 
422   private int activeTasks(final TaskBatch batch) {
423     int count = 0;
424     for (Task t: tasks.values()) {
425       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
426         count++;
427       }
428     }
429     return count;
430   }
431 
432   private int remainingTasksInZK() {
433     int count = 0;
434     try {
435       List<String> tasks =
436         ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
437       if (tasks != null) {
438         for (String t: tasks) {
439           if (!ZKSplitLog.isRescanNode(watcher, t)) {
440             count++;
441           }
442         }
443       }
444     } catch (KeeperException ke) {
445       LOG.warn("Failed to check remaining tasks", ke);
446       count = -1;
447     }
448     return count;
449   }
450 
451   /**
452    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
453    * region server hosting the region can allow reads to the recovered region
454    * @param serverNames servers which are just recovered
455    */
456   private void removeRecoveringRegionsFromZK(final Set<ServerName> serverNames) {
457 
458     if (!this.distributedLogReplay) {
459       // the function is only used in WALEdit direct replay mode
460       return;
461     }
462 
463     int count = 0;
464     Set<String> recoveredServerNameSet = new HashSet<String>();
465     if (serverNames != null) {
466       for (ServerName tmpServerName : serverNames) {
467         recoveredServerNameSet.add(tmpServerName.getServerName());
468       }
469     }
470 
471     try {
472       this.recoveringRegionLock.lock();
473 
474       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
475       if (tasks != null) {
476         for (String t : tasks) {
477           if (!ZKSplitLog.isRescanNode(watcher, t)) {
478             count++;
479           }
480         }
481       }
482       if (count == 0 && this.master.isInitialized()
483           && !this.master.getServerManager().areDeadServersInProgress()) {
484         // no splitting work items left
485         deleteRecoveringRegionZNodes(null);
486         // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
487         // this point.
488         lastRecoveringNodeCreationTime = Long.MAX_VALUE;
489       } else if (!recoveredServerNameSet.isEmpty()) {
490         // remove recovering regions which doesn't have any RS associated with it
491         List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
492         if (regions != null) {
493           for (String region : regions) {
494             String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
495             List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
496             if (failedServers == null || failedServers.isEmpty()) {
497               ZKUtil.deleteNode(watcher, nodePath);
498               continue;
499             } 
500             if (recoveredServerNameSet.containsAll(failedServers)) {
501               ZKUtil.deleteNodeRecursively(watcher, nodePath);
502             } else {
503               for (String failedServer : failedServers) {
504                 if (recoveredServerNameSet.contains(failedServer)) {
505                   String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
506                   ZKUtil.deleteNode(watcher, tmpPath);
507                 }
508               }
509             }
510           }
511         }
512       }
513     } catch (KeeperException ke) {
514       LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
515       if (serverNames != null && !serverNames.isEmpty()) {
516         this.failedRecoveringRegionDeletions.addAll(serverNames);
517       }
518     } finally {
519       this.recoveringRegionLock.unlock();
520     }
521   }
522 
523   /**
524    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
525    * during master initialization phase.
526    * @param failedServers A set of known failed servers
527    * @throws KeeperException
528    */
529   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
530       throws KeeperException {
531 
532     if (!this.distributedLogReplay) {
533       // the function is only used in distributedLogReplay mode when master is in initialization
534       return;
535     }
536 
537     Set<String> knownFailedServers = new HashSet<String>();
538     if (failedServers != null) {
539       for (ServerName tmpServerName : failedServers) {
540         knownFailedServers.add(tmpServerName.getServerName());
541       }
542     }
543 
544     this.recoveringRegionLock.lock();
545     try {
546       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
547       if (tasks != null) {
548         for (String t : tasks) {
549           byte[] data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
550           if (data != null) {
551             SplitLogTask slt = null;
552             try {
553               slt = SplitLogTask.parseFrom(data);
554             } catch (DeserializationException e) {
555               LOG.warn("Failed parse data for znode " + t, e);
556             }
557             if (slt != null && slt.isDone()) {
558               continue;
559             }
560           }
561           // decode the file name
562           t = ZKSplitLog.getFileName(t);
563           ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
564           if (serverName != null) {
565             knownFailedServers.add(serverName.getServerName());
566           } else {
567             LOG.warn("Found invalid WAL log file name:" + t);
568           }
569         }
570       }
571 
572       // remove recovering regions which doesn't have any RS associated with it
573       List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
574       if (regions != null) {
575         for (String region : regions) {
576           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
577           List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
578           if (regionFailedServers == null || regionFailedServers.isEmpty()) {
579             ZKUtil.deleteNode(watcher, nodePath);
580             continue;
581           }
582           boolean needMoreRecovery = false;
583           for (String tmpFailedServer : regionFailedServers) {
584             if (knownFailedServers.contains(tmpFailedServer)) {
585               needMoreRecovery = true;
586               break;
587             }
588           }
589           if (!needMoreRecovery) {
590             ZKUtil.deleteNode(watcher, nodePath);
591           }
592         }
593       }
594     } finally {
595       this.recoveringRegionLock.unlock();
596     }
597   }
598 
599   private void deleteRecoveringRegionZNodes(List<String> regions) {
600     try {
601       if (regions == null) {
602         // remove all children under /home/recovering-regions
603         LOG.info("Garbage collecting all recovering regions.");
604         ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
605       } else {
606         for (String curRegion : regions) {
607           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
608           ZKUtil.deleteNodeRecursively(watcher, nodePath);
609         }
610       }
611     } catch (KeeperException e) {
612       LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
613     }
614   }
615 
616   private void setDone(String path, TerminationStatus status) {
617     Task task = tasks.get(path);
618     if (task == null) {
619       if (!ZKSplitLog.isRescanNode(watcher, path)) {
620         SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
621         LOG.debug("unacquired orphan task is done " + path);
622       }
623     } else {
624       synchronized (task) {
625         if (task.status == IN_PROGRESS) {
626           if (status == SUCCESS) {
627             SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
628             LOG.info("Done splitting " + path);
629           } else {
630             SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
631             LOG.warn("Error splitting " + path);
632           }
633           task.status = status;
634           if (task.batch != null) {
635             synchronized (task.batch) {
636               if (status == SUCCESS) {
637                 task.batch.done++;
638               } else {
639                 task.batch.error++;
640               }
641               task.batch.notify();
642             }
643           }
644         }
645       }
646     }
647     // delete the task node in zk. It's an async
648     // call and no one is blocked waiting for this node to be deleted. All
649     // task names are unique (log.<timestamp>) there is no risk of deleting
650     // a future task.
651     // if a deletion fails, TimeoutMonitor will retry the same deletion later
652     deleteNode(path, zkretries);
653     return;
654   }
655 
656   private void createNode(String path, Long retry_count) {
657     SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
658     ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
659     SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
660     return;
661   }
662 
663   private void createNodeSuccess(String path) {
664     lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
665     LOG.debug("put up splitlog task at znode " + path);
666     getDataSetWatch(path, zkretries);
667   }
668 
669   private void createNodeFailure(String path) {
670     // TODO the Manager should split the log locally instead of giving up
671     LOG.warn("failed to create task node" + path);
672     setDone(path, FAILURE);
673   }
674 
675 
676   private void getDataSetWatch(String path, Long retry_count) {
677     this.watcher.getRecoverableZooKeeper().getZooKeeper().
678         getData(path, this.watcher,
679         new GetDataAsyncCallback(), retry_count);
680     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
681   }
682 
683   private void tryGetDataSetWatch(String path) {
684     // A negative retry count will lead to ignoring all error processing.
685     this.watcher.getRecoverableZooKeeper().getZooKeeper().
686         getData(path, this.watcher,
687         new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */);
688     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
689   }
690 
691   private void getDataSetWatchSuccess(String path, byte[] data, int version)
692   throws DeserializationException {
693     if (data == null) {
694       if (version == Integer.MIN_VALUE) {
695         // assume all done. The task znode suddenly disappeared.
696         setDone(path, SUCCESS);
697         return;
698       }
699       SplitLogCounters.tot_mgr_null_data.incrementAndGet();
700       LOG.fatal("logic error - got null data " + path);
701       setDone(path, FAILURE);
702       return;
703     }
704     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
705     SplitLogTask slt = SplitLogTask.parseFrom(data);
706     if (slt.isUnassigned()) {
707       LOG.debug("task not yet acquired " + path + " ver = " + version);
708       handleUnassignedTask(path);
709     } else if (slt.isOwned()) {
710       heartbeat(path, version, slt.getServerName());
711     } else if (slt.isResigned()) {
712       LOG.info("task " + path + " entered state: " + slt.toString());
713       resubmitOrFail(path, FORCE);
714     } else if (slt.isDone()) {
715       LOG.info("task " + path + " entered state: " + slt.toString());
716       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
717         if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
718           setDone(path, SUCCESS);
719         } else {
720           resubmitOrFail(path, CHECK);
721         }
722       } else {
723         setDone(path, SUCCESS);
724       }
725     } else if (slt.isErr()) {
726       LOG.info("task " + path + " entered state: " + slt.toString());
727       resubmitOrFail(path, CHECK);
728     } else {
729       LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
730       setDone(path, FAILURE);
731     }
732   }
733 
734   private void getDataSetWatchFailure(String path) {
735     LOG.warn("failed to set data watch " + path);
736     setDone(path, FAILURE);
737   }
738 
739   /**
740    * It is possible for a task to stay in UNASSIGNED state indefinitely - say
741    * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
742    * state but it dies before it could create the RESCAN task node to signal
743    * the SplitLogWorkers to pick up the task. To prevent this scenario the
744    * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
745    *
746    * @param path
747    */
748   private void handleUnassignedTask(String path) {
749     if (ZKSplitLog.isRescanNode(watcher, path)) {
750       return;
751     }
752     Task task = findOrCreateOrphanTask(path);
753     if (task.isOrphan() && (task.incarnation == 0)) {
754       LOG.info("resubmitting unassigned orphan task " + path);
755       // ignore failure to resubmit. The timeout-monitor will handle it later
756       // albeit in a more crude fashion
757       resubmit(path, task, FORCE);
758     }
759   }
760 
761   /**
762    * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
763    * @param statusCode integer value of a ZooKeeper exception code
764    * @param action description message about the retried action
765    * @return true when need to abandon retries otherwise false
766    */
767   private boolean needAbandonRetries(int statusCode, String action) {
768     if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
769       LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
770           + "action=" + action);
771       return true;
772     }
773     return false;
774   }
775 
776   private void heartbeat(String path, int new_version, ServerName workerName) {
777     Task task = findOrCreateOrphanTask(path);
778     if (new_version != task.last_version) {
779       if (task.isUnassigned()) {
780         LOG.info("task " + path + " acquired by " + workerName);
781       }
782       task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
783       SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
784     } else {
785       // duplicate heartbeats - heartbeats w/o zk node version
786       // changing - are possible. The timeout thread does
787       // getDataSetWatch() just to check whether a node still
788       // exists or not
789     }
790     return;
791   }
792 
793   private boolean resubmit(String path, Task task, ResubmitDirective directive) {
794     // its ok if this thread misses the update to task.deleted. It will fail later
795     if (task.status != IN_PROGRESS) {
796       return false;
797     }
798     int version;
799     if (directive != FORCE) {
800       // We're going to resubmit:
801       //  1) immediately if the worker server is now marked as dead
802       //  2) after a configurable timeout if the server is not marked as dead but has still not
803       //       finished the task. This allows to continue if the worker cannot actually handle it,
804       //       for any reason.
805       final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
806       final boolean alive = master.getServerManager() != null ?
807           master.getServerManager().isServerOnline(task.cur_worker_name) : true;
808       if (alive && time < timeout) {
809         LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server " +
810             task.cur_worker_name + " is not marked as dead, we waited for " + time +
811             " while the timeout is " + timeout);
812         return false;
813       }
814       if (task.unforcedResubmits >= resubmit_threshold) {
815         if (!task.resubmitThresholdReached) {
816           task.resubmitThresholdReached = true;
817           SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
818           LOG.info("Skipping resubmissions of task " + path +
819               " because threshold " + resubmit_threshold + " reached");
820         }
821         return false;
822       }
823       // race with heartbeat() that might be changing last_version
824       version = task.last_version;
825     } else {
826       version = -1;
827     }
828     LOG.info("resubmitting task " + path);
829     task.incarnation++;
830     try {
831       // blocking zk call but this is done from the timeout thread
832       SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
833       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
834         LOG.debug("failed to resubmit task " + path +
835             " version changed");
836         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
837         return false;
838       }
839     } catch (NoNodeException e) {
840       LOG.warn("failed to resubmit because znode doesn't exist " + path +
841           " task done (or forced done by removing the znode)");
842       try {
843         getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
844       } catch (DeserializationException e1) {
845         LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
846         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
847         return false;
848       }
849       return false;
850     } catch (KeeperException.BadVersionException e) {
851       LOG.debug("failed to resubmit task " + path + " version changed");
852       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
853       return false;
854     } catch (KeeperException e) {
855       SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
856       LOG.warn("failed to resubmit " + path, e);
857       return false;
858     }
859     // don't count forced resubmits
860     if (directive != FORCE) {
861       task.unforcedResubmits++;
862     }
863     task.setUnassigned();
864     createRescanNode(Long.MAX_VALUE);
865     SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
866     return true;
867   }
868 
869   private void resubmitOrFail(String path, ResubmitDirective directive) {
870     if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
871       setDone(path, FAILURE);
872     }
873   }
874 
875   private void deleteNode(String path, Long retries) {
876     SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
877     // Once a task znode is ready for delete, that is it is in the TASK_DONE
878     // state, then no one should be writing to it anymore. That is no one
879     // will be updating the znode version any more.
880     this.watcher.getRecoverableZooKeeper().getZooKeeper().
881       delete(path, -1, new DeleteAsyncCallback(),
882         retries);
883   }
884 
885   private void deleteNodeSuccess(String path) {
886     if (ignoreZKDeleteForTesting) {
887       return;
888     }
889     Task task;
890     task = tasks.remove(path);
891     if (task == null) {
892       if (ZKSplitLog.isRescanNode(watcher, path)) {
893         SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
894       }
895       SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
896       LOG.debug("deleted task without in memory state " + path);
897       return;
898     }
899     synchronized (task) {
900       task.status = DELETED;
901       task.notify();
902     }
903     SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
904   }
905 
906   private void deleteNodeFailure(String path) {
907     LOG.info("Failed to delete node " + path + " and will retry soon.");
908     return;
909   }
910 
911   /**
912    * signal the workers that a task was resubmitted by creating the
913    * RESCAN node.
914    * @throws KeeperException 
915    */
916   private void createRescanNode(long retries) {
917     // The RESCAN node will be deleted almost immediately by the
918     // SplitLogManager as soon as it is created because it is being
919     // created in the DONE state. This behavior prevents a buildup
920     // of RESCAN nodes. But there is also a chance that a SplitLogWorker
921     // might miss the watch-trigger that creation of RESCAN node provides.
922     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
923     // therefore this behavior is safe.
924     SplitLogTask slt = new SplitLogTask.Done(this.serverName);
925     this.watcher.getRecoverableZooKeeper().getZooKeeper().
926       create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
927         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
928         new CreateRescanAsyncCallback(), Long.valueOf(retries));
929   }
930 
931   private void createRescanSuccess(String path) {
932     lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
933     SplitLogCounters.tot_mgr_rescan.incrementAndGet();
934     getDataSetWatch(path, zkretries);
935   }
936 
937   private void createRescanFailure() {
938     LOG.fatal("logic failure, rescan failure must not happen");
939   }
940 
941   /**
942    * @param path
943    * @param batch
944    * @return null on success, existing task on error
945    */
946   private Task createTaskIfAbsent(String path, TaskBatch batch) {
947     Task oldtask;
948     // batch.installed is only changed via this function and
949     // a single thread touches batch.installed.
950     Task newtask = new Task();
951     newtask.batch = batch;
952     oldtask = tasks.putIfAbsent(path, newtask);
953     if (oldtask == null) {
954       batch.installed++;
955       return  null;
956     }
957     // new task was not used.
958     synchronized (oldtask) {
959       if (oldtask.isOrphan()) {
960         if (oldtask.status == SUCCESS) {
961           // The task is already done. Do not install the batch for this
962           // task because it might be too late for setDone() to update
963           // batch.done. There is no need for the batch creator to wait for
964           // this task to complete.
965           return (null);
966         }
967         if (oldtask.status == IN_PROGRESS) {
968           oldtask.batch = batch;
969           batch.installed++;
970           LOG.debug("Previously orphan task " + path + " is now being waited upon");
971           return null;
972         }
973         while (oldtask.status == FAILURE) {
974           LOG.debug("wait for status of task " + path + " to change to DELETED");
975           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
976           try {
977             oldtask.wait();
978           } catch (InterruptedException e) {
979             Thread.currentThread().interrupt();
980             LOG.warn("Interrupted when waiting for znode delete callback");
981             // fall through to return failure
982             break;
983           }
984         }
985         if (oldtask.status != DELETED) {
986           LOG.warn("Failure because previously failed task" +
987               " state still present. Waiting for znode delete callback" +
988               " path=" + path);
989           return oldtask;
990         }
991         // reinsert the newTask and it must succeed this time
992         Task t = tasks.putIfAbsent(path, newtask);
993         if (t == null) {
994           batch.installed++;
995           return  null;
996         }
997         LOG.fatal("Logic error. Deleted task still present in tasks map");
998         assert false : "Deleted task still present in tasks map";
999         return t;
1000       }
1001       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1002       return oldtask;
1003     }
1004   }
1005 
1006   Task findOrCreateOrphanTask(String path) {
1007     Task orphanTask = new Task();
1008     Task task;
1009     task = tasks.putIfAbsent(path, orphanTask);
1010     if (task == null) {
1011       LOG.info("creating orphan task " + path);
1012       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1013       task = orphanTask;
1014     }
1015     return task;
1016   }
1017 
1018   @Override
1019   public void nodeDataChanged(String path) {
1020     Task task;
1021     task = tasks.get(path);
1022     if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1023       if (task != null) {
1024         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1025       }
1026       getDataSetWatch(path, zkretries);
1027     }
1028   }
1029 
1030   public void stop() {
1031     if (timeoutMonitor != null) {
1032       timeoutMonitor.interrupt();
1033     }
1034   }
1035 
1036   private void lookForOrphans() {
1037     List<String> orphans;
1038     try {
1039        orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1040           this.watcher.splitLogZNode);
1041       if (orphans == null) {
1042         LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1043         return;
1044       }
1045     } catch (KeeperException e) {
1046       LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1047           " " + StringUtils.stringifyException(e));
1048       return;
1049     }
1050     int rescan_nodes = 0;
1051     for (String path : orphans) {
1052       String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1053       if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1054         rescan_nodes++;
1055         LOG.debug("found orphan rescan node " + path);
1056       } else {
1057         LOG.info("found orphan task " + path);
1058       }
1059       getDataSetWatch(nodepath, zkretries);
1060     }
1061     LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1062         rescan_nodes + " rescan nodes");
1063   }
1064 
1065   /**
1066    * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
1067    * all regions of the passed in region servers
1068    * @param serverName the name of a region server
1069    * @param userRegions user regiones assigned on the region server
1070    */
1071   void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1072       throws KeeperException {
1073     if (userRegions == null || !this.distributedLogReplay) {
1074       return;
1075     }
1076 
1077     try {
1078       this.recoveringRegionLock.lock();
1079       // mark that we're creating recovering znodes
1080       this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1081 
1082       for (HRegionInfo region : userRegions) {
1083         String regionEncodeName = region.getEncodedName();
1084         long retries = this.zkretries;
1085 
1086         do {
1087           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1088           long lastRecordedFlushedSequenceId = -1;
1089           try {
1090             long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1091               regionEncodeName.getBytes());
1092 
1093             /*
1094              * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
1095              * flushed sequence id for the server]
1096              */
1097             byte[] data = ZKUtil.getData(this.watcher, nodePath);
1098             if (data == null) {
1099               ZKUtil.createSetData(this.watcher, nodePath,
1100                 ZKUtil.positionToByteArray(lastSequenceId));
1101             } else {
1102               lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1103               if (lastRecordedFlushedSequenceId < lastSequenceId) {
1104                 // update last flushed sequence id in the region level
1105                 ZKUtil.setData(this.watcher, nodePath,
1106                   ZKUtil.positionToByteArray(lastSequenceId));
1107               }
1108             }
1109             // go one level deeper with server name
1110             nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1111             if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1112               // the newly assigned RS failed even before any flush to the region
1113               lastSequenceId = lastRecordedFlushedSequenceId;
1114             }
1115             ZKUtil.createSetData(this.watcher, nodePath,
1116               ZKUtil.positionToByteArray(lastSequenceId));
1117             LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1118                 + serverName);
1119 
1120             // break retry loop
1121             break;
1122           } catch (KeeperException e) {
1123             // ignore ZooKeeper exceptions inside retry loop
1124             if (retries <= 1) {
1125               throw e;
1126             }
1127             // wait a little bit for retry
1128             try {
1129               Thread.sleep(20);
1130             } catch (Exception ignoreE) {
1131               // ignore
1132             }
1133           }
1134         } while ((--retries) > 0 && (!this.stopper.isStopped()));
1135       }
1136     } finally {
1137       this.recoveringRegionLock.unlock();
1138     }
1139   }
1140 
1141   /**
1142    * @param bytes - Content of a failed region server or recovering region znode.
1143    * @return long - The last flushed sequence Id for the region server
1144    */
1145   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1146     long lastRecordedFlushedSequenceId = -1l;
1147     try {
1148       lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1149     } catch (DeserializationException e) {
1150       lastRecordedFlushedSequenceId = -1l;
1151       LOG.warn("Can't parse last flushed sequence Id", e);
1152     }
1153     return lastRecordedFlushedSequenceId;
1154   }
1155 
1156   /**
1157    * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
1158    * @param zkw
1159    * @param serverName
1160    * @param encodedRegionName
1161    * @return the last flushed sequence id recorded in ZK of the region for <code>serverName<code>
1162    * @throws IOException
1163    */
1164   public static long getLastFlushedSequenceId(ZooKeeperWatcher zkw, String serverName,
1165       String encodedRegionName) throws IOException {
1166     // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
1167     // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
1168     // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
1169     // sequence Id name space (sequence Id only valid for a particular RS instance), changes 
1170     // when different newly assigned RS flushes the region.
1171     // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
1172     // last flushed sequence Id for each failed RS instance.
1173     long lastFlushedSequenceId = -1;
1174     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1175     nodePath = ZKUtil.joinZNode(nodePath, serverName);
1176     try {
1177       byte[] data = ZKUtil.getData(zkw, nodePath);
1178       if (data != null) {
1179         lastFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1180       }
1181     } catch (KeeperException e) {
1182       throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1183           + serverName + "; region=" + encodedRegionName, e);
1184     }
1185     return lastFlushedSequenceId;
1186   }
1187 
1188   /**
1189    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
1190    * Clients threads use this object to wait for all their tasks to be done.
1191    * <p>
1192    * All access is synchronized.
1193    */
1194   static class TaskBatch {
1195     int installed = 0;
1196     int done = 0;
1197     int error = 0;
1198     volatile boolean isDead = false;
1199 
1200     @Override
1201     public String toString() {
1202       return ("installed = " + installed + " done = " + done + " error = " + error);
1203     }
1204   }
1205 
1206   /**
1207    * in memory state of an active task.
1208    */
1209   static class Task {
1210     volatile long last_update;
1211     volatile int last_version;
1212     volatile ServerName cur_worker_name;
1213     volatile TaskBatch batch;
1214     volatile TerminationStatus status;
1215     volatile int incarnation;
1216     volatile int unforcedResubmits;
1217     volatile boolean resubmitThresholdReached;
1218 
1219     @Override
1220     public String toString() {
1221       return ("last_update = " + last_update +
1222           " last_version = " + last_version +
1223           " cur_worker_name = " + cur_worker_name +
1224           " status = " + status +
1225           " incarnation = " + incarnation +
1226           " resubmits = " + unforcedResubmits +
1227           " batch = " + batch);
1228     }
1229 
1230     Task() {
1231       incarnation = 0;
1232       last_version = -1;
1233       status = IN_PROGRESS;
1234       setUnassigned();
1235     }
1236 
1237     public boolean isOrphan() {
1238       return (batch == null || batch.isDead);
1239     }
1240 
1241     public boolean isUnassigned() {
1242       return (cur_worker_name == null);
1243     }
1244 
1245     public void heartbeatNoDetails(long time) {
1246       last_update = time;
1247     }
1248 
1249     public void heartbeat(long time, int version, ServerName worker) {
1250       last_version = version;
1251       last_update = time;
1252       cur_worker_name = worker;
1253     }
1254 
1255     public void setUnassigned() {
1256       cur_worker_name = null;
1257       last_update = -1;
1258     }
1259   }
1260 
1261   void handleDeadWorker(ServerName workerName) {
1262     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
1263     // to reason about concurrency. Makes it easier to retry.
1264     synchronized (deadWorkersLock) {
1265       if (deadWorkers == null) {
1266         deadWorkers = new HashSet<ServerName>(100);
1267       }
1268       deadWorkers.add(workerName);
1269     }
1270     LOG.info("dead splitlog worker " + workerName);
1271   }
1272 
1273   void handleDeadWorkers(Set<ServerName> serverNames) {
1274     synchronized (deadWorkersLock) {
1275       if (deadWorkers == null) {
1276         deadWorkers = new HashSet<ServerName>(100);
1277       }
1278       deadWorkers.addAll(serverNames);
1279     }
1280     LOG.info("dead splitlog workers " + serverNames);
1281   }
1282 
1283   /**
1284    * Periodically checks all active tasks and resubmits the ones that have timed
1285    * out
1286    */
1287   private class TimeoutMonitor extends Chore {
1288     public TimeoutMonitor(final int period, Stoppable stopper) {
1289       super("SplitLogManager Timeout Monitor", period, stopper);
1290     }
1291 
1292     @Override
1293     protected void chore() {
1294       int resubmitted = 0;
1295       int unassigned = 0;
1296       int tot = 0;
1297       boolean found_assigned_task = false;
1298       Set<ServerName> localDeadWorkers;
1299 
1300       synchronized (deadWorkersLock) {
1301         localDeadWorkers = deadWorkers;
1302         deadWorkers = null;
1303       }
1304 
1305       for (Map.Entry<String, Task> e : tasks.entrySet()) {
1306         String path = e.getKey();
1307         Task task = e.getValue();
1308         ServerName cur_worker = task.cur_worker_name;
1309         tot++;
1310         // don't easily resubmit a task which hasn't been picked up yet. It
1311         // might be a long while before a SplitLogWorker is free to pick up a
1312         // task. This is because a SplitLogWorker picks up a task one at a
1313         // time. If we want progress when there are no region servers then we
1314         // will have to run a SplitLogWorker thread in the Master.
1315         if (task.isUnassigned()) {
1316           unassigned++;
1317           continue;
1318         }
1319         found_assigned_task = true;
1320         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1321           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1322           if (resubmit(path, task, FORCE)) {
1323             resubmitted++;
1324           } else {
1325             handleDeadWorker(cur_worker);
1326             LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1327                 cur_worker + ", will retry.");
1328           }
1329         } else if (resubmit(path, task, CHECK)) {
1330           resubmitted++;
1331         }
1332       }
1333       if (tot > 0) {
1334         LOG.debug("total tasks = " + tot + " unassigned = " + unassigned);
1335       }
1336       if (resubmitted > 0) {
1337         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1338       }
1339       // If there are pending tasks and all of them have been unassigned for
1340       // some time then put up a RESCAN node to ping the workers.
1341       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
1342       // because a. it is very unlikely that every worker had a
1343       // transient error when trying to grab the task b. if there are no
1344       // workers then all tasks wills stay unassigned indefinitely and the
1345       // manager will be indefinitely creating RESCAN nodes. TODO may be the
1346       // master should spawn both a manager and a worker thread to guarantee
1347       // that there is always one worker in the system
1348       if (tot > 0 && !found_assigned_task &&
1349           ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
1350           unassignedTimeout)) {
1351         for (Map.Entry<String, Task> e : tasks.entrySet()) {
1352           String path = e.getKey();
1353           Task task = e.getValue();
1354           // we have to do task.isUnassigned() check again because tasks might
1355           // have been asynchronously assigned. There is no locking required
1356           // for these checks ... it is OK even if tryGetDataSetWatch() is
1357           // called unnecessarily for a task
1358           if (task.isUnassigned() && (task.status != FAILURE)) {
1359             // We just touch the znode to make sure its still there
1360             tryGetDataSetWatch(path);
1361           }
1362         }
1363         createRescanNode(Long.MAX_VALUE);
1364         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1365         LOG.debug("resubmitting unassigned task(s) after timeout");
1366       }
1367 
1368       // Retry previously failed deletes
1369       if (failedDeletions.size() > 0) {
1370         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1371         for (String tmpPath : tmpPaths) {
1372           // deleteNode is an async call
1373           deleteNode(tmpPath, zkretries);
1374         }
1375         failedDeletions.removeAll(tmpPaths);
1376       }
1377 
1378       // Garbage collect left-over /hbase/recovering-regions/... znode
1379       long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1380           - lastRecoveringNodeCreationTime;
1381       if (!failedRecoveringRegionDeletions.isEmpty()
1382           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1383         // inside the function there have more checks before GC anything
1384         Set<ServerName> previouslyFailedDeletoins = null;
1385         if (!failedRecoveringRegionDeletions.isEmpty()) {
1386           previouslyFailedDeletoins = new HashSet<ServerName>(failedRecoveringRegionDeletions);
1387           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletoins);
1388         }
1389         removeRecoveringRegionsFromZK(previouslyFailedDeletoins);
1390       }
1391     }
1392   }
1393 
1394   /**
1395    * Asynchronous handler for zk create node results.
1396    * Retries on failures.
1397    */
1398   class CreateAsyncCallback implements AsyncCallback.StringCallback {
1399     private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1400 
1401     @Override
1402     public void processResult(int rc, String path, Object ctx, String name) {
1403       SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1404       if (rc != 0) {
1405         if (needAbandonRetries(rc, "Create znode " + path)) {
1406           createNodeFailure(path);
1407           return;
1408         }
1409         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1410           // What if there is a delete pending against this pre-existing
1411           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
1412           // state. Only operations that will be carried out on this node by
1413           // this manager are get-znode-data, task-finisher and delete-znode.
1414           // And all code pieces correctly handle the case of suddenly
1415           // disappearing task-znode.
1416           LOG.debug("found pre-existing znode " + path);
1417           SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1418         } else {
1419           Long retry_count = (Long)ctx;
1420           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1421               path + " remaining retries=" + retry_count);
1422           if (retry_count == 0) {
1423             SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1424             createNodeFailure(path);
1425           } else {
1426             SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1427             createNode(path, retry_count - 1);
1428           }
1429           return;
1430         }
1431       }
1432       createNodeSuccess(path);
1433     }
1434   }
1435 
1436   /**
1437    * Asynchronous handler for zk get-data-set-watch on node results.
1438    * Retries on failures.
1439    */
1440   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1441     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1442 
1443     @Override
1444     public void processResult(int rc, String path, Object ctx, byte[] data,
1445         Stat stat) {
1446       SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1447       if (rc != 0) {
1448         if (needAbandonRetries(rc, "GetData from znode " + path)) {
1449           return;
1450         }
1451         if (rc == KeeperException.Code.NONODE.intValue()) {
1452           SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1453           // The task znode has been deleted. Must be some pending delete
1454           // that deleted the task. Assume success because a task-znode is
1455           // is only deleted after TaskFinisher is successful.
1456           LOG.warn("task znode " + path + " vanished.");
1457           try {
1458             getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1459           } catch (DeserializationException e) {
1460             LOG.warn("Deserialization problem", e);
1461           }
1462           return;
1463         }
1464         Long retry_count = (Long) ctx;
1465 
1466         if (retry_count < 0) {
1467           LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1468               path + ". Ignoring error. No error handling. No retrying.");
1469           return;
1470         }
1471         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1472             path + " remaining retries=" + retry_count);
1473         if (retry_count == 0) {
1474           SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1475           getDataSetWatchFailure(path);
1476         } else {
1477           SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1478           getDataSetWatch(path, retry_count - 1);
1479         }
1480         return;
1481       }
1482       try {
1483         getDataSetWatchSuccess(path, data, stat.getVersion());
1484       } catch (DeserializationException e) {
1485         LOG.warn("Deserialization problem", e);
1486       }
1487       return;
1488     }
1489   }
1490 
1491   /**
1492    * Asynchronous handler for zk delete node results.
1493    * Retries on failures.
1494    */
1495   class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1496     private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1497 
1498     @Override
1499     public void processResult(int rc, String path, Object ctx) {
1500       SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1501       if (rc != 0) {
1502         if (needAbandonRetries(rc, "Delete znode " + path)) {
1503           failedDeletions.add(path);
1504           return;
1505         }
1506         if (rc != KeeperException.Code.NONODE.intValue()) {
1507           SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1508           Long retry_count = (Long) ctx;
1509           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1510               path + " remaining retries=" + retry_count);
1511           if (retry_count == 0) {
1512             LOG.warn("delete failed " + path);
1513             failedDeletions.add(path);
1514             deleteNodeFailure(path);
1515           } else {
1516             deleteNode(path, retry_count - 1);
1517           }
1518           return;
1519         } else {
1520           LOG.info(path +
1521             " does not exist. Either was created but deleted behind our" +
1522             " back by another pending delete OR was deleted" +
1523             " in earlier retry rounds. zkretries = " + (Long) ctx);
1524         }
1525       } else {
1526         LOG.debug("deleted " + path);
1527       }
1528       deleteNodeSuccess(path);
1529     }
1530   }
1531 
1532   /**
1533    * Asynchronous handler for zk create RESCAN-node results.
1534    * Retries on failures.
1535    * <p>
1536    * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
1537    * for all the {@link SplitLogWorker}s to rescan for new tasks.
1538    */
1539   class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1540     private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1541 
1542     @Override
1543     public void processResult(int rc, String path, Object ctx, String name) {
1544       if (rc != 0) {
1545         if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1546           return;
1547         }
1548         Long retry_count = (Long)ctx;
1549         LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1550             " remaining retries=" + retry_count);
1551         if (retry_count == 0) {
1552           createRescanFailure();
1553         } else {
1554           createRescanNode(retry_count - 1);
1555         }
1556         return;
1557       }
1558       // path is the original arg, name is the actual name that was created
1559       createRescanSuccess(name);
1560     }
1561   }
1562 
1563   /**
1564    * {@link SplitLogManager} can use objects implementing this interface to
1565    * finish off a partially done task by {@link SplitLogWorker}. This provides
1566    * a serialization point at the end of the task processing. Must be
1567    * restartable and idempotent.
1568    */
1569   static public interface TaskFinisher {
1570     /**
1571      * status that can be returned finish()
1572      */
1573     static public enum Status {
1574       /**
1575        * task completed successfully
1576        */
1577       DONE(),
1578       /**
1579        * task completed with error
1580        */
1581       ERR();
1582     }
1583     /**
1584      * finish the partially done task. workername provides clue to where the
1585      * partial results of the partially done tasks are present. taskname is the
1586      * name of the task that was put up in zookeeper.
1587      * <p>
1588      * @param workerName
1589      * @param taskname
1590      * @return DONE if task completed successfully, ERR otherwise
1591      */
1592     public Status finish(ServerName workerName, String taskname);
1593   }
1594 
1595   enum ResubmitDirective {
1596     CHECK(),
1597     FORCE();
1598   }
1599 
1600   enum TerminationStatus {
1601     IN_PROGRESS("in_progress"),
1602     SUCCESS("success"),
1603     FAILURE("failure"),
1604     DELETED("deleted");
1605 
1606     String statusMsg;
1607     TerminationStatus(String msg) {
1608       statusMsg = msg;
1609     }
1610     
1611     @Override
1612     public String toString() {
1613       return statusMsg;
1614     }
1615   }
1616   
1617   /**
1618    * Completes the initialization
1619    */
1620   public void finishInitialization() {
1621     finishInitialization(false);
1622   }
1623   
1624 }