View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.SplitLogCounters;
53  import org.apache.hadoop.hbase.SplitLogTask;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.exceptions.DeserializationException;
56  import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
57  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
58  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
59  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
60  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
61  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
62  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
63  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
64  import org.apache.hadoop.hbase.util.FSUtils;
65  import org.apache.hadoop.hbase.util.Pair;
66  import org.apache.hadoop.hbase.util.Threads;
67  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
68  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
69  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
70  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
71  import org.apache.hadoop.util.StringUtils;
72  import org.apache.zookeeper.AsyncCallback;
73  import org.apache.zookeeper.CreateMode;
74  import org.apache.zookeeper.KeeperException;
75  import org.apache.zookeeper.KeeperException.NoNodeException;
76  import org.apache.zookeeper.ZooDefs.Ids;
77  import org.apache.zookeeper.data.Stat;
78  
79  /**
80   * Distributes the task of log splitting to the available region servers.
81   * Coordination happens via zookeeper. For every log file that has to be split a
82   * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race to grab a task.
83   *
84   * <p>SplitLogManager monitors the task znodes that it creates using the
85   * timeoutMonitor thread. If a task's progress is slow then
86   * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
87   * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the
88   * task's znode is deleted by SplitLogManager.
89   *
90   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
91   * log files. The caller thread waits in this method until all the log files
92   * have been split.
93   *
94   * <p>All the zookeeper calls made by this class are asynchronous. This is mainly
95   * to help reduce response time seen by the callers.
96   *
97   * <p>There is race in this design between the SplitLogManager and the
98   * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
99   * already been completed by a SplitLogWorker. We rely on the idempotency of
100  * the log splitting task for correctness.
101  *
102  * <p>It is also assumed that every log splitting task is unique and once
103  * completed (either with success or with error) it will be not be submitted
104  * again. If a task is resubmitted then there is a risk that old "delete task"
105  * can delete the re-submission.
106  */
107 @InterfaceAudience.Private
108 public class SplitLogManager extends ZooKeeperListener {
109   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
110 
111   public static final int DEFAULT_TIMEOUT = 120000;
112   public static final int DEFAULT_ZK_RETRIES = 3;
113   public static final int DEFAULT_MAX_RESUBMIT = 3;
114   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
115 
116   private final Stoppable stopper;
117   private final MasterServices master;
118   private final ServerName serverName;
119   private final TaskFinisher taskFinisher;
120   private FileSystem fs;
121   private Configuration conf;
122 
123   private long zkretries;
124   private long resubmit_threshold;
125   private long timeout;
126   private long unassignedTimeout;
127   private long lastNodeCreateTime = Long.MAX_VALUE;
128   public boolean ignoreZKDeleteForTesting = false;
129   private volatile long lastRecoveringNodeCreationTime = 0;
130   // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
131   // whether to GC stale recovering znodes
132   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
133   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
134       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
135 
136   /**
137    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
138    * operation. So the lock is used to guard such cases.
139    */
140   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
141 
142   final boolean distributedLogReplay;
143 
144   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
145   private TimeoutMonitor timeoutMonitor;
146 
147   private volatile Set<ServerName> deadWorkers = null;
148   private final Object deadWorkersLock = new Object();
149 
150   private Set<String> failedDeletions = null;
151 
152   /**
153    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
154    *   Stoppable stopper, MasterServices master, ServerName serverName,
155    *   boolean masterRecovery, TaskFinisher tf)}
156    * with masterRecovery = false, and tf = null.  Used in unit tests.
157    *
158    * @param zkw the ZK watcher
159    * @param conf the HBase configuration
160    * @param stopper the stoppable in case anything is wrong
161    * @param master the master services
162    * @param serverName the master server name
163    */
164   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
165       Stoppable stopper, MasterServices master, ServerName serverName) {
166     this(zkw, conf, stopper, master, serverName, false, null);
167   }
168 
169   /**
170    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
171    *   Stoppable stopper, MasterServices master, ServerName serverName,
172    *   boolean masterRecovery, TaskFinisher tf)}
173    * that provides a task finisher for copying recovered edits to their final destination.
174    * The task finisher has to be robust because it can be arbitrarily restarted or called
175    * multiple times.
176    *
177    * @param zkw the ZK watcher
178    * @param conf the HBase configuration
179    * @param stopper the stoppable in case anything is wrong
180    * @param master the master services
181    * @param serverName the master server name
182    * @param masterRecovery an indication if the master is in recovery
183    */
184   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
185       Stoppable stopper, MasterServices master, ServerName serverName, boolean masterRecovery) {
186     this(zkw, conf, stopper, master, serverName, masterRecovery, new TaskFinisher() {
187       @Override
188       public Status finish(ServerName workerName, String logfile) {
189         try {
190           HLogSplitter.finishSplitLogFile(logfile, conf);
191         } catch (IOException e) {
192           LOG.warn("Could not finish splitting of log file " + logfile, e);
193           return Status.ERR;
194         }
195         return Status.DONE;
196       }
197     });
198   }
199 
200   /**
201    * Its OK to construct this object even when region-servers are not online. It
202    * does lookup the orphan tasks in zk but it doesn't block waiting for them
203    * to be done.
204    *
205    * @param zkw the ZK watcher
206    * @param conf the HBase configuration
207    * @param stopper the stoppable in case anything is wrong
208    * @param master the master services
209    * @param serverName the master server name
210    * @param masterRecovery an indication if the master is in recovery
211    * @param tf task finisher
212    */
213   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
214         Stoppable stopper, MasterServices master,
215         ServerName serverName, boolean masterRecovery, TaskFinisher tf) {
216     super(zkw);
217     this.taskFinisher = tf;
218     this.conf = conf;
219     this.stopper = stopper;
220     this.master = master;
221     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
222     this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
223     this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
224     this.unassignedTimeout =
225       conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
226     this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(conf);
227     LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
228       ", distributedLogReplay=" + this.distributedLogReplay);
229 
230     this.serverName = serverName;
231     this.timeoutMonitor = new TimeoutMonitor(
232       conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
233 
234     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
235 
236     if (!masterRecovery) {
237       Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
238           + ".splitLogManagerTimeoutMonitor");
239     }
240     // Watcher can be null during tests with Mock'd servers.
241     if (this.watcher != null) {
242       this.watcher.registerListener(this);
243       lookForOrphans();
244     }
245   }
246 
247   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
248     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
249     for (Path hLogDir : logDirs) {
250       this.fs = hLogDir.getFileSystem(conf);
251       if (!fs.exists(hLogDir)) {
252         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
253         continue;
254       }
255       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
256       if (logfiles == null || logfiles.length == 0) {
257         LOG.info(hLogDir + " is empty dir, no logs to split");
258       } else {
259         for (FileStatus status : logfiles)
260           fileStatus.add(status);
261       }
262     }
263     FileStatus[] a = new FileStatus[fileStatus.size()];
264     return fileStatus.toArray(a);
265   }
266 
267   /**
268    * @param logDir
269    *            one region sever hlog dir path in .logs
270    * @throws IOException
271    *             if there was an error while splitting any log file
272    * @return cumulative size of the logfiles split
273    * @throws IOException 
274    */
275   public long splitLogDistributed(final Path logDir) throws IOException {
276     List<Path> logDirs = new ArrayList<Path>();
277     logDirs.add(logDir);
278     return splitLogDistributed(logDirs);
279   }
280 
281   /**
282    * The caller will block until all the log files of the given region server
283    * have been processed - successfully split or an error is encountered - by an
284    * available worker region server. This method must only be called after the
285    * region servers have been brought online.
286    *
287    * @param logDirs List of log dirs to split
288    * @throws IOException If there was an error while splitting any log file
289    * @return cumulative size of the logfiles split
290    */
291   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
292     if (logDirs.isEmpty()) {
293       return 0;
294     }
295     Set<ServerName> serverNames = new HashSet<ServerName>();
296     for (Path logDir : logDirs) {
297       try {
298         ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
299         if (serverName != null) {
300           serverNames.add(serverName);
301         }
302       } catch (IllegalArgumentException e) {
303         // ignore invalid format error.
304         LOG.warn("Cannot parse server name from " + logDir);
305       }
306     }
307     return splitLogDistributed(serverNames, logDirs, null);
308   }
309 
310   /**
311    * The caller will block until all the hbase:meta log files of the given region server
312    * have been processed - successfully split or an error is encountered - by an
313    * available worker region server. This method must only be called after the
314    * region servers have been brought online.
315    *
316    * @param logDirs List of log dirs to split
317    * @param filter the Path filter to select specific files for considering
318    * @throws IOException If there was an error while splitting any log file
319    * @return cumulative size of the logfiles split
320    */
321   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
322       PathFilter filter) throws IOException {
323     MonitoredTask status = TaskMonitor.get().createStatus(
324           "Doing distributed log split in " + logDirs);
325     FileStatus[] logfiles = getFileList(logDirs, filter);
326     status.setStatus("Checking directory contents...");
327     LOG.debug("Scheduling batch of logs to split");
328     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
329     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
330     long t = EnvironmentEdgeManager.currentTimeMillis();
331     long totalSize = 0;
332     TaskBatch batch = new TaskBatch();
333     Boolean isMetaRecovery = (filter == null) ? null : false;
334     for (FileStatus lf : logfiles) {
335       // TODO If the log file is still being written to - which is most likely
336       // the case for the last log file - then its length will show up here
337       // as zero. The size of such a file can only be retrieved after
338       // recover-lease is done. totalSize will be under in most cases and the
339       // metrics that it drives will also be under-reported.
340       totalSize += lf.getLen();
341       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
342       if (!enqueueSplitTask(pathToLog, batch)) {
343         throw new IOException("duplicate log split scheduled for " + lf.getPath());
344       }
345     }
346     waitForSplittingCompletion(batch, status);
347     // remove recovering regions from ZK
348     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
349       // we split meta regions and user regions separately therefore logfiles are either all for
350       // meta or user regions but won't for both( we could have mixed situations in tests)
351       isMetaRecovery = true;
352     }
353     this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
354 
355     if (batch.done != batch.installed) {
356       batch.isDead = true;
357       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
358       LOG.warn("error while splitting logs in " + logDirs +
359       " installed = " + batch.installed + " but only " + batch.done + " done");
360       String msg = "error or interrupted while splitting logs in "
361         + logDirs + " Task = " + batch;
362       status.abort(msg);
363       throw new IOException(msg);
364     }
365     for(Path logDir: logDirs){
366       status.setStatus("Cleaning up log directory...");
367       try {
368         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
369           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
370         }
371       } catch (IOException ioe) {
372         FileStatus[] files = fs.listStatus(logDir);
373         if (files != null && files.length > 0) {
374           LOG.warn("returning success without actually splitting and " + 
375               "deleting all the log files in path " + logDir);
376         } else {
377           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
378         }
379       }
380       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
381     }
382     String msg = "finished splitting (more than or equal to) " + totalSize +
383         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
384         (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
385     status.markComplete(msg);
386     LOG.info(msg);
387     return totalSize;
388   }
389 
390   /**
391    * Add a task entry to splitlog znode if it is not already there.
392    * 
393    * @param taskname the path of the log to be split
394    * @param batch the batch this task belongs to
395    * @return true if a new entry is created, false if it is already there.
396    */
397   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
398     SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
399     // This is a znode path under the splitlog dir with the rest of the path made up of an
400     // url encoding of the passed in log to split.
401     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
402     Task oldtask = createTaskIfAbsent(path, batch);
403     if (oldtask == null) {
404       // publish the task in zk
405       createNode(path, zkretries);
406       return true;
407     }
408     return false;
409   }
410 
411   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
412     synchronized (batch) {
413       while ((batch.done + batch.error) != batch.installed) {
414         try {
415           status.setStatus("Waiting for distributed tasks to finish. "
416               + " scheduled=" + batch.installed
417               + " done=" + batch.done
418               + " error=" + batch.error);
419           int remaining = batch.installed - (batch.done + batch.error);
420           int actual = activeTasks(batch);
421           if (remaining != actual) {
422             LOG.warn("Expected " + remaining
423               + " active tasks, but actually there are " + actual);
424           }
425           int remainingInZK = remainingTasksInZK();
426           if (remainingInZK >= 0 && actual > remainingInZK) {
427             LOG.warn("Expected at least" + actual
428               + " tasks in ZK, but actually there are " + remainingInZK);
429           }
430           if (remainingInZK == 0 || actual == 0) {
431             LOG.warn("No more task remaining (ZK or task map), splitting "
432               + "should have completed. Remaining tasks in ZK " + remainingInZK
433               + ", active tasks in map " + actual);
434             if (remainingInZK == 0 && actual == 0) {
435               return;
436             }
437           }
438           batch.wait(100);
439           if (stopper.isStopped()) {
440             LOG.warn("Stopped while waiting for log splits to be completed");
441             return;
442           }
443         } catch (InterruptedException e) {
444           LOG.warn("Interrupted while waiting for log splits to be completed");
445           Thread.currentThread().interrupt();
446           return;
447         }
448       }
449     }
450   }
451 
452   private int activeTasks(final TaskBatch batch) {
453     int count = 0;
454     for (Task t: tasks.values()) {
455       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
456         count++;
457       }
458     }
459     return count;
460   }
461 
462   private int remainingTasksInZK() {
463     int count = 0;
464     try {
465       List<String> tasks =
466         ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
467       if (tasks != null) {
468         for (String t: tasks) {
469           if (!ZKSplitLog.isRescanNode(watcher, t)) {
470             count++;
471           }
472         }
473       }
474     } catch (KeeperException ke) {
475       LOG.warn("Failed to check remaining tasks", ke);
476       count = -1;
477     }
478     return count;
479   }
480 
481   /**
482    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
483    * region server hosting the region can allow reads to the recovered region
484    * @param serverNames servers which are just recovered
485    * @param isMetaRecovery whether current recovery is for the meta region on
486    *          <code>serverNames<code>
487    */
488   private void
489       removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
490 
491     if (!this.distributedLogReplay) {
492       // the function is only used in WALEdit direct replay mode
493       return;
494     }
495 
496     final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
497     int count = 0;
498     Set<String> recoveredServerNameSet = new HashSet<String>();
499     if (serverNames != null) {
500       for (ServerName tmpServerName : serverNames) {
501         recoveredServerNameSet.add(tmpServerName.getServerName());
502       }
503     }
504 
505     try {
506       this.recoveringRegionLock.lock();
507 
508       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
509       if (tasks != null) {
510         for (String t : tasks) {
511           if (!ZKSplitLog.isRescanNode(watcher, t)) {
512             count++;
513           }
514         }
515       }
516       if (count == 0 && this.master.isInitialized()
517           && !this.master.getServerManager().areDeadServersInProgress()) {
518         // no splitting work items left
519         deleteRecoveringRegionZNodes(null);
520         // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
521         // this point.
522         lastRecoveringNodeCreationTime = Long.MAX_VALUE;
523       } else if (!recoveredServerNameSet.isEmpty()) {
524         // remove recovering regions which doesn't have any RS associated with it
525         List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
526         if (regions != null) {
527           for (String region : regions) {
528             if(isMetaRecovery != null) {
529               if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
530                   || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
531                 // skip non-meta regions when recovering the meta region or 
532                 // skip the meta region when recovering user regions
533                 continue;
534               }
535             }
536             String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
537             List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
538             if (failedServers == null || failedServers.isEmpty()) {
539               ZKUtil.deleteNode(watcher, nodePath);
540               continue;
541             }
542             if (recoveredServerNameSet.containsAll(failedServers)) {
543               ZKUtil.deleteNodeRecursively(watcher, nodePath);
544             } else {
545               for (String failedServer : failedServers) {
546                 if (recoveredServerNameSet.contains(failedServer)) {
547                   String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
548                   ZKUtil.deleteNode(watcher, tmpPath);
549                 }
550               }
551             }
552           }
553         }
554       }
555     } catch (KeeperException ke) {
556       LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
557       if (serverNames != null && !serverNames.isEmpty()) {
558         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
559             isMetaRecovery));
560       }
561     } finally {
562       this.recoveringRegionLock.unlock();
563     }
564   }
565 
566   /**
567    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
568    * during master initialization phase.
569    * @param failedServers A set of known failed servers
570    * @throws KeeperException
571    */
572   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
573       throws KeeperException, InterruptedIOException {
574 
575     if (!this.distributedLogReplay) {
576       // remove any regions in recovery from ZK which could happen when we turn the feature on
577       // and later turn it off
578       ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
579       // the function is only used in distributedLogReplay mode when master is in initialization
580       return;
581     }
582 
583     Set<String> knownFailedServers = new HashSet<String>();
584     if (failedServers != null) {
585       for (ServerName tmpServerName : failedServers) {
586         knownFailedServers.add(tmpServerName.getServerName());
587       }
588     }
589 
590     this.recoveringRegionLock.lock();
591     try {
592       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
593       if (tasks != null) {
594         for (String t : tasks) {
595           byte[] data;
596           try {
597             data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
598           } catch (InterruptedException e) {
599             throw new InterruptedIOException();
600           }
601           if (data != null) {
602             SplitLogTask slt = null;
603             try {
604               slt = SplitLogTask.parseFrom(data);
605             } catch (DeserializationException e) {
606               LOG.warn("Failed parse data for znode " + t, e);
607             }
608             if (slt != null && slt.isDone()) {
609               continue;
610             }
611           }
612           // decode the file name
613           t = ZKSplitLog.getFileName(t);
614           ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
615           if (serverName != null) {
616             knownFailedServers.add(serverName.getServerName());
617           } else {
618             LOG.warn("Found invalid WAL log file name:" + t);
619           }
620         }
621       }
622 
623       // remove recovering regions which doesn't have any RS associated with it
624       List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
625       if (regions != null) {
626         for (String region : regions) {
627           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
628           List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
629           if (regionFailedServers == null || regionFailedServers.isEmpty()) {
630             ZKUtil.deleteNode(watcher, nodePath);
631             continue;
632           }
633           boolean needMoreRecovery = false;
634           for (String tmpFailedServer : regionFailedServers) {
635             if (knownFailedServers.contains(tmpFailedServer)) {
636               needMoreRecovery = true;
637               break;
638             }
639           }
640           if (!needMoreRecovery) {
641             ZKUtil.deleteNodeRecursively(watcher, nodePath);
642           }
643         }
644       }
645     } finally {
646       this.recoveringRegionLock.unlock();
647     }
648   }
649 
650   private void deleteRecoveringRegionZNodes(List<String> regions) {
651     try {
652       if (regions == null) {
653         // remove all children under /home/recovering-regions
654         LOG.info("Garbage collecting all recovering regions.");
655         ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
656       } else {
657         for (String curRegion : regions) {
658           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
659           ZKUtil.deleteNodeRecursively(watcher, nodePath);
660         }
661       }
662     } catch (KeeperException e) {
663       LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
664     }
665   }
666 
667   private void setDone(String path, TerminationStatus status) {
668     Task task = tasks.get(path);
669     if (task == null) {
670       if (!ZKSplitLog.isRescanNode(watcher, path)) {
671         SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
672         LOG.debug("unacquired orphan task is done " + path);
673       }
674     } else {
675       synchronized (task) {
676         if (task.status == IN_PROGRESS) {
677           if (status == SUCCESS) {
678             SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
679             LOG.info("Done splitting " + path);
680           } else {
681             SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
682             LOG.warn("Error splitting " + path);
683           }
684           task.status = status;
685           if (task.batch != null) {
686             synchronized (task.batch) {
687               if (status == SUCCESS) {
688                 task.batch.done++;
689               } else {
690                 task.batch.error++;
691               }
692               task.batch.notify();
693             }
694           }
695         }
696       }
697     }
698     // delete the task node in zk. It's an async
699     // call and no one is blocked waiting for this node to be deleted. All
700     // task names are unique (log.<timestamp>) there is no risk of deleting
701     // a future task.
702     // if a deletion fails, TimeoutMonitor will retry the same deletion later
703     deleteNode(path, zkretries);
704     return;
705   }
706 
707   private void createNode(String path, Long retry_count) {
708     SplitLogTask slt = new SplitLogTask.Unassigned(serverName);
709     ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
710     SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
711     return;
712   }
713 
714   private void createNodeSuccess(String path) {
715     lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
716     LOG.debug("put up splitlog task at znode " + path);
717     getDataSetWatch(path, zkretries);
718   }
719 
720   private void createNodeFailure(String path) {
721     // TODO the Manager should split the log locally instead of giving up
722     LOG.warn("failed to create task node" + path);
723     setDone(path, FAILURE);
724   }
725 
726 
727   private void getDataSetWatch(String path, Long retry_count) {
728     this.watcher.getRecoverableZooKeeper().getZooKeeper().
729         getData(path, this.watcher,
730         new GetDataAsyncCallback(), retry_count);
731     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
732   }
733 
734   private void tryGetDataSetWatch(String path) {
735     // A negative retry count will lead to ignoring all error processing.
736     this.watcher.getRecoverableZooKeeper().getZooKeeper().
737         getData(path, this.watcher,
738         new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */);
739     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
740   }
741 
742   private void getDataSetWatchSuccess(String path, byte[] data, int version)
743   throws DeserializationException {
744     if (data == null) {
745       if (version == Integer.MIN_VALUE) {
746         // assume all done. The task znode suddenly disappeared.
747         setDone(path, SUCCESS);
748         return;
749       }
750       SplitLogCounters.tot_mgr_null_data.incrementAndGet();
751       LOG.fatal("logic error - got null data " + path);
752       setDone(path, FAILURE);
753       return;
754     }
755     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
756     SplitLogTask slt = SplitLogTask.parseFrom(data);
757     if (slt.isUnassigned()) {
758       LOG.debug("task not yet acquired " + path + " ver = " + version);
759       handleUnassignedTask(path);
760     } else if (slt.isOwned()) {
761       heartbeat(path, version, slt.getServerName());
762     } else if (slt.isResigned()) {
763       LOG.info("task " + path + " entered state: " + slt.toString());
764       resubmitOrFail(path, FORCE);
765     } else if (slt.isDone()) {
766       LOG.info("task " + path + " entered state: " + slt.toString());
767       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
768         if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
769           setDone(path, SUCCESS);
770         } else {
771           resubmitOrFail(path, CHECK);
772         }
773       } else {
774         setDone(path, SUCCESS);
775       }
776     } else if (slt.isErr()) {
777       LOG.info("task " + path + " entered state: " + slt.toString());
778       resubmitOrFail(path, CHECK);
779     } else {
780       LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
781       setDone(path, FAILURE);
782     }
783   }
784 
785   private void getDataSetWatchFailure(String path) {
786     LOG.warn("failed to set data watch " + path);
787     setDone(path, FAILURE);
788   }
789 
790   /**
791    * It is possible for a task to stay in UNASSIGNED state indefinitely - say
792    * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
793    * state but it dies before it could create the RESCAN task node to signal
794    * the SplitLogWorkers to pick up the task. To prevent this scenario the
795    * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
796    *
797    * @param path
798    */
799   private void handleUnassignedTask(String path) {
800     if (ZKSplitLog.isRescanNode(watcher, path)) {
801       return;
802     }
803     Task task = findOrCreateOrphanTask(path);
804     if (task.isOrphan() && (task.incarnation == 0)) {
805       LOG.info("resubmitting unassigned orphan task " + path);
806       // ignore failure to resubmit. The timeout-monitor will handle it later
807       // albeit in a more crude fashion
808       resubmit(path, task, FORCE);
809     }
810   }
811 
812   /**
813    * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
814    * @param statusCode integer value of a ZooKeeper exception code
815    * @param action description message about the retried action
816    * @return true when need to abandon retries otherwise false
817    */
818   private boolean needAbandonRetries(int statusCode, String action) {
819     if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
820       LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
821           + "action=" + action);
822       return true;
823     }
824     return false;
825   }
826 
827   private void heartbeat(String path, int new_version, ServerName workerName) {
828     Task task = findOrCreateOrphanTask(path);
829     if (new_version != task.last_version) {
830       if (task.isUnassigned()) {
831         LOG.info("task " + path + " acquired by " + workerName);
832       }
833       task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
834       SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
835     } else {
836       // duplicate heartbeats - heartbeats w/o zk node version
837       // changing - are possible. The timeout thread does
838       // getDataSetWatch() just to check whether a node still
839       // exists or not
840     }
841     return;
842   }
843 
844   private boolean resubmit(String path, Task task, ResubmitDirective directive) {
845     // its ok if this thread misses the update to task.deleted. It will fail later
846     if (task.status != IN_PROGRESS) {
847       return false;
848     }
849     int version;
850     if (directive != FORCE) {
851       // We're going to resubmit:
852       //  1) immediately if the worker server is now marked as dead
853       //  2) after a configurable timeout if the server is not marked as dead but has still not
854       //       finished the task. This allows to continue if the worker cannot actually handle it,
855       //       for any reason.
856       final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
857       final boolean alive = master.getServerManager() != null ?
858           master.getServerManager().isServerOnline(task.cur_worker_name) : true;
859       if (alive && time < timeout) {
860         LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server " +
861             task.cur_worker_name + " is not marked as dead, we waited for " + time +
862             " while the timeout is " + timeout);
863         return false;
864       }
865       if (task.unforcedResubmits.get() >= resubmit_threshold) {
866         if (!task.resubmitThresholdReached) {
867           task.resubmitThresholdReached = true;
868           SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
869           LOG.info("Skipping resubmissions of task " + path +
870               " because threshold " + resubmit_threshold + " reached");
871         }
872         return false;
873       }
874       // race with heartbeat() that might be changing last_version
875       version = task.last_version;
876     } else {
877       SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
878       version = -1;
879     }
880     LOG.info("resubmitting task " + path);
881     task.incarnation++;
882     try {
883       // blocking zk call but this is done from the timeout thread
884       SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName);
885       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
886         LOG.debug("failed to resubmit task " + path +
887             " version changed");
888         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
889         return false;
890       }
891     } catch (NoNodeException e) {
892       LOG.warn("failed to resubmit because znode doesn't exist " + path +
893           " task done (or forced done by removing the znode)");
894       try {
895         getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
896       } catch (DeserializationException e1) {
897         LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
898         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
899         return false;
900       }
901       return false;
902     } catch (KeeperException.BadVersionException e) {
903       LOG.debug("failed to resubmit task " + path + " version changed");
904       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
905       return false;
906     } catch (KeeperException e) {
907       SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
908       LOG.warn("failed to resubmit " + path, e);
909       return false;
910     }
911     // don't count forced resubmits
912     if (directive != FORCE) {
913       task.unforcedResubmits.incrementAndGet();
914     }
915     task.setUnassigned();
916     createRescanNode(Long.MAX_VALUE);
917     SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
918     return true;
919   }
920 
921   private void resubmitOrFail(String path, ResubmitDirective directive) {
922     if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
923       setDone(path, FAILURE);
924     }
925   }
926 
927   private void deleteNode(String path, Long retries) {
928     SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
929     // Once a task znode is ready for delete, that is it is in the TASK_DONE
930     // state, then no one should be writing to it anymore. That is no one
931     // will be updating the znode version any more.
932     this.watcher.getRecoverableZooKeeper().getZooKeeper().
933       delete(path, -1, new DeleteAsyncCallback(),
934         retries);
935   }
936 
937   private void deleteNodeSuccess(String path) {
938     if (ignoreZKDeleteForTesting) {
939       return;
940     }
941     Task task;
942     task = tasks.remove(path);
943     if (task == null) {
944       if (ZKSplitLog.isRescanNode(watcher, path)) {
945         SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
946       }
947       SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
948       LOG.debug("deleted task without in memory state " + path);
949       return;
950     }
951     synchronized (task) {
952       task.status = DELETED;
953       task.notify();
954     }
955     SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
956   }
957 
958   private void deleteNodeFailure(String path) {
959     LOG.info("Failed to delete node " + path + " and will retry soon.");
960     return;
961   }
962 
963   /**
964    * signal the workers that a task was resubmitted by creating the
965    * RESCAN node.
966    * @throws KeeperException 
967    */
968   private void createRescanNode(long retries) {
969     // The RESCAN node will be deleted almost immediately by the
970     // SplitLogManager as soon as it is created because it is being
971     // created in the DONE state. This behavior prevents a buildup
972     // of RESCAN nodes. But there is also a chance that a SplitLogWorker
973     // might miss the watch-trigger that creation of RESCAN node provides.
974     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
975     // therefore this behavior is safe.
976     SplitLogTask slt = new SplitLogTask.Done(this.serverName);
977     this.watcher.getRecoverableZooKeeper().getZooKeeper().
978       create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
979         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
980         new CreateRescanAsyncCallback(), Long.valueOf(retries));
981   }
982 
983   private void createRescanSuccess(String path) {
984     lastNodeCreateTime = EnvironmentEdgeManager.currentTimeMillis();
985     SplitLogCounters.tot_mgr_rescan.incrementAndGet();
986     getDataSetWatch(path, zkretries);
987   }
988 
989   private void createRescanFailure() {
990     LOG.fatal("logic failure, rescan failure must not happen");
991   }
992 
993   /**
994    * @param path
995    * @param batch
996    * @return null on success, existing task on error
997    */
998   private Task createTaskIfAbsent(String path, TaskBatch batch) {
999     Task oldtask;
1000     // batch.installed is only changed via this function and
1001     // a single thread touches batch.installed.
1002     Task newtask = new Task();
1003     newtask.batch = batch;
1004     oldtask = tasks.putIfAbsent(path, newtask);
1005     if (oldtask == null) {
1006       batch.installed++;
1007       return  null;
1008     }
1009     // new task was not used.
1010     synchronized (oldtask) {
1011       if (oldtask.isOrphan()) {
1012         if (oldtask.status == SUCCESS) {
1013           // The task is already done. Do not install the batch for this
1014           // task because it might be too late for setDone() to update
1015           // batch.done. There is no need for the batch creator to wait for
1016           // this task to complete.
1017           return (null);
1018         }
1019         if (oldtask.status == IN_PROGRESS) {
1020           oldtask.batch = batch;
1021           batch.installed++;
1022           LOG.debug("Previously orphan task " + path + " is now being waited upon");
1023           return null;
1024         }
1025         while (oldtask.status == FAILURE) {
1026           LOG.debug("wait for status of task " + path + " to change to DELETED");
1027           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
1028           try {
1029             oldtask.wait();
1030           } catch (InterruptedException e) {
1031             Thread.currentThread().interrupt();
1032             LOG.warn("Interrupted when waiting for znode delete callback");
1033             // fall through to return failure
1034             break;
1035           }
1036         }
1037         if (oldtask.status != DELETED) {
1038           LOG.warn("Failure because previously failed task" +
1039               " state still present. Waiting for znode delete callback" +
1040               " path=" + path);
1041           return oldtask;
1042         }
1043         // reinsert the newTask and it must succeed this time
1044         Task t = tasks.putIfAbsent(path, newtask);
1045         if (t == null) {
1046           batch.installed++;
1047           return  null;
1048         }
1049         LOG.fatal("Logic error. Deleted task still present in tasks map");
1050         assert false : "Deleted task still present in tasks map";
1051         return t;
1052       }
1053       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1054       return oldtask;
1055     }
1056   }
1057 
1058   Task findOrCreateOrphanTask(String path) {
1059     Task orphanTask = new Task();
1060     Task task;
1061     task = tasks.putIfAbsent(path, orphanTask);
1062     if (task == null) {
1063       LOG.info("creating orphan task " + path);
1064       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1065       task = orphanTask;
1066     }
1067     return task;
1068   }
1069 
1070   @Override
1071   public void nodeDataChanged(String path) {
1072     Task task;
1073     task = tasks.get(path);
1074     if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1075       if (task != null) {
1076         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1077       }
1078       getDataSetWatch(path, zkretries);
1079     }
1080   }
1081 
1082   public void stop() {
1083     if (timeoutMonitor != null) {
1084       timeoutMonitor.interrupt();
1085     }
1086   }
1087 
1088   private void lookForOrphans() {
1089     List<String> orphans;
1090     try {
1091        orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1092           this.watcher.splitLogZNode);
1093       if (orphans == null) {
1094         LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1095         return;
1096       }
1097     } catch (KeeperException e) {
1098       LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1099           " " + StringUtils.stringifyException(e));
1100       return;
1101     }
1102     int rescan_nodes = 0;
1103     for (String path : orphans) {
1104       String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1105       if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1106         rescan_nodes++;
1107         LOG.debug("found orphan rescan node " + path);
1108       } else {
1109         LOG.info("found orphan task " + path);
1110       }
1111       getDataSetWatch(nodepath, zkretries);
1112     }
1113     LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1114         rescan_nodes + " rescan nodes");
1115   }
1116 
1117   /**
1118    * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
1119    * all regions of the passed in region servers
1120    * @param serverName the name of a region server
1121    * @param userRegions user regiones assigned on the region server
1122    */
1123   void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1124       throws KeeperException, InterruptedIOException {
1125     if (userRegions == null || !this.distributedLogReplay) {
1126       return;
1127     }
1128 
1129     try {
1130       this.recoveringRegionLock.lock();
1131       // mark that we're creating recovering znodes
1132       this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1133 
1134       for (HRegionInfo region : userRegions) {
1135         String regionEncodeName = region.getEncodedName();
1136         long retries = this.zkretries;
1137 
1138         do {
1139           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1140           long lastRecordedFlushedSequenceId = -1;
1141           try {
1142             long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1143               regionEncodeName.getBytes());
1144 
1145             /*
1146              * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
1147              * flushed sequence id for the server]
1148              */
1149             byte[] data = ZKUtil.getData(this.watcher, nodePath);
1150             if (data == null) {
1151               ZKUtil.createSetData(this.watcher, nodePath,
1152                 ZKUtil.positionToByteArray(lastSequenceId));
1153             } else {
1154               lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1155               if (lastRecordedFlushedSequenceId < lastSequenceId) {
1156                 // update last flushed sequence id in the region level
1157                 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
1158               }
1159             }
1160             // go one level deeper with server name
1161             nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1162             if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1163               // the newly assigned RS failed even before any flush to the region
1164               lastSequenceId = lastRecordedFlushedSequenceId;
1165             }
1166             ZKUtil.createSetData(this.watcher, nodePath,
1167               ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
1168             LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1169                 + serverName);
1170 
1171             // break retry loop
1172             break;
1173           } catch (KeeperException e) {
1174             // ignore ZooKeeper exceptions inside retry loop
1175             if (retries <= 1) {
1176               throw e;
1177             }
1178             // wait a little bit for retry
1179             try {
1180               Thread.sleep(20);
1181             } catch (InterruptedException e1) {
1182               throw new InterruptedIOException();
1183             }
1184           } catch (InterruptedException e) {
1185             throw new InterruptedIOException();
1186           }
1187         } while ((--retries) > 0 && (!this.stopper.isStopped()));
1188       }
1189     } finally {
1190       this.recoveringRegionLock.unlock();
1191     }
1192   }
1193 
1194   /**
1195    * @param bytes - Content of a failed region server or recovering region znode.
1196    * @return long - The last flushed sequence Id for the region server
1197    */
1198   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1199     long lastRecordedFlushedSequenceId = -1l;
1200     try {
1201       lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1202     } catch (DeserializationException e) {
1203       lastRecordedFlushedSequenceId = -1l;
1204       LOG.warn("Can't parse last flushed sequence Id", e);
1205     }
1206     return lastRecordedFlushedSequenceId;
1207   }
1208 
1209   /**
1210    * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
1211    * and set watcher as well.
1212    * @param zkw
1213    * @param regionEncodedName region encode name
1214    * @return true when /hbase/recovering-regions/<current region encoded name> exists
1215    * @throws KeeperException
1216    */
1217   public static boolean
1218       isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
1219           throws KeeperException {
1220     boolean result = false;
1221     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
1222 
1223     byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
1224     if (node != null) {
1225       result = true;
1226     }
1227     return result;
1228   }
1229 
1230   /**
1231    * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
1232    * @param zkw
1233    * @param serverName
1234    * @param encodedRegionName
1235    * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
1236    * @throws IOException
1237    */
1238   public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
1239       String serverName, String encodedRegionName) throws IOException {
1240     // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
1241     // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
1242     // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
1243     // sequence Id name space (sequence Id only valid for a particular RS instance), changes 
1244     // when different newly assigned RS flushes the region.
1245     // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
1246     // last flushed sequence Id for each failed RS instance.
1247     RegionStoreSequenceIds result = null;
1248     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1249     nodePath = ZKUtil.joinZNode(nodePath, serverName);
1250     try {
1251       byte[] data;
1252       try {
1253         data = ZKUtil.getData(zkw, nodePath);
1254       } catch (InterruptedException e) {
1255         throw new InterruptedIOException();
1256       }
1257       if (data != null) {
1258         result = ZKUtil.parseRegionStoreSequenceIds(data);
1259       }
1260     } catch (KeeperException e) {
1261       throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1262           + serverName + "; region=" + encodedRegionName, e);
1263     } catch (DeserializationException e) {
1264       LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
1265     }
1266     return result;
1267   }
1268 
1269   /**
1270    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
1271    * Clients threads use this object to wait for all their tasks to be done.
1272    * <p>
1273    * All access is synchronized.
1274    */
1275   static class TaskBatch {
1276     int installed = 0;
1277     int done = 0;
1278     int error = 0;
1279     volatile boolean isDead = false;
1280 
1281     @Override
1282     public String toString() {
1283       return ("installed = " + installed + " done = " + done + " error = " + error);
1284     }
1285   }
1286 
1287   /**
1288    * in memory state of an active task.
1289    */
1290   static class Task {
1291     volatile long last_update;
1292     volatile int last_version;
1293     volatile ServerName cur_worker_name;
1294     volatile TaskBatch batch;
1295     volatile TerminationStatus status;
1296     volatile int incarnation;
1297     final AtomicInteger unforcedResubmits = new AtomicInteger();
1298     volatile boolean resubmitThresholdReached;
1299 
1300     @Override
1301     public String toString() {
1302       return ("last_update = " + last_update +
1303           " last_version = " + last_version +
1304           " cur_worker_name = " + cur_worker_name +
1305           " status = " + status +
1306           " incarnation = " + incarnation +
1307           " resubmits = " + unforcedResubmits.get() +
1308           " batch = " + batch);
1309     }
1310 
1311     Task() {
1312       incarnation = 0;
1313       last_version = -1;
1314       status = IN_PROGRESS;
1315       setUnassigned();
1316     }
1317 
1318     public boolean isOrphan() {
1319       return (batch == null || batch.isDead);
1320     }
1321 
1322     public boolean isUnassigned() {
1323       return (cur_worker_name == null);
1324     }
1325 
1326     public void heartbeatNoDetails(long time) {
1327       last_update = time;
1328     }
1329 
1330     public void heartbeat(long time, int version, ServerName worker) {
1331       last_version = version;
1332       last_update = time;
1333       cur_worker_name = worker;
1334     }
1335 
1336     public void setUnassigned() {
1337       cur_worker_name = null;
1338       last_update = -1;
1339     }
1340   }
1341 
1342   void handleDeadWorker(ServerName workerName) {
1343     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
1344     // to reason about concurrency. Makes it easier to retry.
1345     synchronized (deadWorkersLock) {
1346       if (deadWorkers == null) {
1347         deadWorkers = new HashSet<ServerName>(100);
1348       }
1349       deadWorkers.add(workerName);
1350     }
1351     LOG.info("dead splitlog worker " + workerName);
1352   }
1353 
1354   void handleDeadWorkers(Set<ServerName> serverNames) {
1355     synchronized (deadWorkersLock) {
1356       if (deadWorkers == null) {
1357         deadWorkers = new HashSet<ServerName>(100);
1358       }
1359       deadWorkers.addAll(serverNames);
1360     }
1361     LOG.info("dead splitlog workers " + serverNames);
1362   }
1363 
1364   /**
1365    * Periodically checks all active tasks and resubmits the ones that have timed
1366    * out
1367    */
1368   private class TimeoutMonitor extends Chore {
1369     private long lastLog = 0;
1370 
1371     public TimeoutMonitor(final int period, Stoppable stopper) {
1372       super("SplitLogManager Timeout Monitor", period, stopper);
1373     }
1374 
1375     @Override
1376     protected void chore() {
1377       int resubmitted = 0;
1378       int unassigned = 0;
1379       int tot = 0;
1380       boolean found_assigned_task = false;
1381       Set<ServerName> localDeadWorkers;
1382 
1383       synchronized (deadWorkersLock) {
1384         localDeadWorkers = deadWorkers;
1385         deadWorkers = null;
1386       }
1387 
1388       for (Map.Entry<String, Task> e : tasks.entrySet()) {
1389         String path = e.getKey();
1390         Task task = e.getValue();
1391         ServerName cur_worker = task.cur_worker_name;
1392         tot++;
1393         // don't easily resubmit a task which hasn't been picked up yet. It
1394         // might be a long while before a SplitLogWorker is free to pick up a
1395         // task. This is because a SplitLogWorker picks up a task one at a
1396         // time. If we want progress when there are no region servers then we
1397         // will have to run a SplitLogWorker thread in the Master.
1398         if (task.isUnassigned()) {
1399           unassigned++;
1400           continue;
1401         }
1402         found_assigned_task = true;
1403         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1404           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1405           if (resubmit(path, task, FORCE)) {
1406             resubmitted++;
1407           } else {
1408             handleDeadWorker(cur_worker);
1409             LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1410                 cur_worker + ", will retry.");
1411           }
1412         } else if (resubmit(path, task, CHECK)) {
1413           resubmitted++;
1414         }
1415       }
1416       if (tot > 0) {
1417         long now = EnvironmentEdgeManager.currentTimeMillis();
1418         if (now > lastLog + 5000) {
1419           lastLog = now;
1420           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
1421         }
1422       }
1423       if (resubmitted > 0) {
1424         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1425       }
1426       // If there are pending tasks and all of them have been unassigned for
1427       // some time then put up a RESCAN node to ping the workers.
1428       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
1429       // because a. it is very unlikely that every worker had a
1430       // transient error when trying to grab the task b. if there are no
1431       // workers then all tasks wills stay unassigned indefinitely and the
1432       // manager will be indefinitely creating RESCAN nodes. TODO may be the
1433       // master should spawn both a manager and a worker thread to guarantee
1434       // that there is always one worker in the system
1435       if (tot > 0 && !found_assigned_task &&
1436           ((EnvironmentEdgeManager.currentTimeMillis() - lastNodeCreateTime) >
1437           unassignedTimeout)) {
1438         for (Map.Entry<String, Task> e : tasks.entrySet()) {
1439           String path = e.getKey();
1440           Task task = e.getValue();
1441           // we have to do task.isUnassigned() check again because tasks might
1442           // have been asynchronously assigned. There is no locking required
1443           // for these checks ... it is OK even if tryGetDataSetWatch() is
1444           // called unnecessarily for a task
1445           if (task.isUnassigned() && (task.status != FAILURE)) {
1446             // We just touch the znode to make sure its still there
1447             tryGetDataSetWatch(path);
1448           }
1449         }
1450         createRescanNode(Long.MAX_VALUE);
1451         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1452         LOG.debug("resubmitting unassigned task(s) after timeout");
1453       }
1454 
1455       // Retry previously failed deletes
1456       if (failedDeletions.size() > 0) {
1457         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1458         for (String tmpPath : tmpPaths) {
1459           // deleteNode is an async call
1460           deleteNode(tmpPath, zkretries);
1461         }
1462         failedDeletions.removeAll(tmpPaths);
1463       }
1464 
1465       // Garbage collect left-over /hbase/recovering-regions/... znode
1466       long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1467           - lastRecoveringNodeCreationTime;
1468       if (!failedRecoveringRegionDeletions.isEmpty()
1469           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1470         // inside the function there have more checks before GC anything
1471         if (!failedRecoveringRegionDeletions.isEmpty()) {
1472           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
1473               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
1474           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
1475           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
1476             removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
1477           }
1478         } else {
1479           removeRecoveringRegionsFromZK(null, null);
1480         }
1481       }
1482     }
1483   }
1484 
1485   /**
1486    * Asynchronous handler for zk create node results.
1487    * Retries on failures.
1488    */
1489   class CreateAsyncCallback implements AsyncCallback.StringCallback {
1490     private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1491 
1492     @Override
1493     public void processResult(int rc, String path, Object ctx, String name) {
1494       SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1495       if (rc != 0) {
1496         if (needAbandonRetries(rc, "Create znode " + path)) {
1497           createNodeFailure(path);
1498           return;
1499         }
1500         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1501           // What if there is a delete pending against this pre-existing
1502           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
1503           // state. Only operations that will be carried out on this node by
1504           // this manager are get-znode-data, task-finisher and delete-znode.
1505           // And all code pieces correctly handle the case of suddenly
1506           // disappearing task-znode.
1507           LOG.debug("found pre-existing znode " + path);
1508           SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1509         } else {
1510           Long retry_count = (Long)ctx;
1511           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1512               path + " remaining retries=" + retry_count);
1513           if (retry_count == 0) {
1514             SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1515             createNodeFailure(path);
1516           } else {
1517             SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1518             createNode(path, retry_count - 1);
1519           }
1520           return;
1521         }
1522       }
1523       createNodeSuccess(path);
1524     }
1525   }
1526 
1527   /**
1528    * Asynchronous handler for zk get-data-set-watch on node results.
1529    * Retries on failures.
1530    */
1531   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1532     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1533 
1534     @Override
1535     public void processResult(int rc, String path, Object ctx, byte[] data,
1536         Stat stat) {
1537       SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1538       if (rc != 0) {
1539         if (needAbandonRetries(rc, "GetData from znode " + path)) {
1540           return;
1541         }
1542         if (rc == KeeperException.Code.NONODE.intValue()) {
1543           SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1544           // The task znode has been deleted. Must be some pending delete
1545           // that deleted the task. Assume success because a task-znode is
1546           // is only deleted after TaskFinisher is successful.
1547           LOG.warn("task znode " + path + " vanished.");
1548           try {
1549             getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
1550           } catch (DeserializationException e) {
1551             LOG.warn("Deserialization problem", e);
1552           }
1553           return;
1554         }
1555         Long retry_count = (Long) ctx;
1556 
1557         if (retry_count < 0) {
1558           LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1559               path + ". Ignoring error. No error handling. No retrying.");
1560           return;
1561         }
1562         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1563             path + " remaining retries=" + retry_count);
1564         if (retry_count == 0) {
1565           SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1566           getDataSetWatchFailure(path);
1567         } else {
1568           SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1569           getDataSetWatch(path, retry_count - 1);
1570         }
1571         return;
1572       }
1573       try {
1574         getDataSetWatchSuccess(path, data, stat.getVersion());
1575       } catch (DeserializationException e) {
1576         LOG.warn("Deserialization problem", e);
1577       }
1578       return;
1579     }
1580   }
1581 
1582   /**
1583    * Asynchronous handler for zk delete node results.
1584    * Retries on failures.
1585    */
1586   class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1587     private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1588 
1589     @Override
1590     public void processResult(int rc, String path, Object ctx) {
1591       SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1592       if (rc != 0) {
1593         if (needAbandonRetries(rc, "Delete znode " + path)) {
1594           failedDeletions.add(path);
1595           return;
1596         }
1597         if (rc != KeeperException.Code.NONODE.intValue()) {
1598           SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1599           Long retry_count = (Long) ctx;
1600           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1601               path + " remaining retries=" + retry_count);
1602           if (retry_count == 0) {
1603             LOG.warn("delete failed " + path);
1604             failedDeletions.add(path);
1605             deleteNodeFailure(path);
1606           } else {
1607             deleteNode(path, retry_count - 1);
1608           }
1609           return;
1610         } else {
1611           LOG.info(path +
1612             " does not exist. Either was created but deleted behind our" +
1613             " back by another pending delete OR was deleted" +
1614             " in earlier retry rounds. zkretries = " + (Long) ctx);
1615         }
1616       } else {
1617         LOG.debug("deleted " + path);
1618       }
1619       deleteNodeSuccess(path);
1620     }
1621   }
1622 
1623   /**
1624    * Asynchronous handler for zk create RESCAN-node results.
1625    * Retries on failures.
1626    * <p>
1627    * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
1628    * for all the {@link SplitLogWorker}s to rescan for new tasks.
1629    */
1630   class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1631     private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1632 
1633     @Override
1634     public void processResult(int rc, String path, Object ctx, String name) {
1635       if (rc != 0) {
1636         if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1637           return;
1638         }
1639         Long retry_count = (Long)ctx;
1640         LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1641             " remaining retries=" + retry_count);
1642         if (retry_count == 0) {
1643           createRescanFailure();
1644         } else {
1645           createRescanNode(retry_count - 1);
1646         }
1647         return;
1648       }
1649       // path is the original arg, name is the actual name that was created
1650       createRescanSuccess(name);
1651     }
1652   }
1653 
1654   /**
1655    * {@link SplitLogManager} can use objects implementing this interface to
1656    * finish off a partially done task by {@link SplitLogWorker}. This provides
1657    * a serialization point at the end of the task processing. Must be
1658    * restartable and idempotent.
1659    */
1660   public interface TaskFinisher {
1661     /**
1662      * status that can be returned finish()
1663      */
1664     enum Status {
1665       /**
1666        * task completed successfully
1667        */
1668       DONE(),
1669       /**
1670        * task completed with error
1671        */
1672       ERR();
1673     }
1674     /**
1675      * finish the partially done task. workername provides clue to where the
1676      * partial results of the partially done tasks are present. taskname is the
1677      * name of the task that was put up in zookeeper.
1678      * <p>
1679      * @param workerName
1680      * @param taskname
1681      * @return DONE if task completed successfully, ERR otherwise
1682      */
1683     Status finish(ServerName workerName, String taskname);
1684   }
1685 
1686   enum ResubmitDirective {
1687     CHECK(),
1688     FORCE();
1689   }
1690 
1691   enum TerminationStatus {
1692     IN_PROGRESS("in_progress"),
1693     SUCCESS("success"),
1694     FAILURE("failure"),
1695     DELETED("deleted");
1696 
1697     String statusMsg;
1698     TerminationStatus(String msg) {
1699       statusMsg = msg;
1700     }
1701     
1702     @Override
1703     public String toString() {
1704       return statusMsg;
1705     }
1706   }
1707 }