View Javadoc

1   /**
2     * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.master;
19  
20  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK;
21  import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE;
22  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED;
23  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.FAILURE;
24  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.IN_PROGRESS;
25  import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.SUCCESS;
26  
27  import java.io.IOException;
28  import java.io.InterruptedIOException;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.HashSet;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.ConcurrentHashMap;
36  import java.util.concurrent.ConcurrentMap;
37  import java.util.concurrent.atomic.AtomicInteger;
38  import java.util.concurrent.locks.ReentrantLock;
39  
40  import org.apache.commons.logging.Log;
41  import org.apache.commons.logging.LogFactory;
42  import org.apache.hadoop.classification.InterfaceAudience;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.fs.FileStatus;
45  import org.apache.hadoop.fs.FileSystem;
46  import org.apache.hadoop.fs.Path;
47  import org.apache.hadoop.fs.PathFilter;
48  import org.apache.hadoop.hbase.Chore;
49  import org.apache.hadoop.hbase.HConstants;
50  import org.apache.hadoop.hbase.HRegionInfo;
51  import org.apache.hadoop.hbase.ServerName;
52  import org.apache.hadoop.hbase.SplitLogCounters;
53  import org.apache.hadoop.hbase.SplitLogTask;
54  import org.apache.hadoop.hbase.Stoppable;
55  import org.apache.hadoop.hbase.exceptions.DeserializationException;
56  import org.apache.hadoop.hbase.io.hfile.HFile;
57  import org.apache.hadoop.hbase.master.SplitLogManager.TaskFinisher.Status;
58  import org.apache.hadoop.hbase.monitoring.MonitoredTask;
59  import org.apache.hadoop.hbase.monitoring.TaskMonitor;
60  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
61  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
62  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
63  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
64  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
65  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
66  import org.apache.hadoop.hbase.util.FSUtils;
67  import org.apache.hadoop.hbase.util.Pair;
68  import org.apache.hadoop.hbase.util.Threads;
69  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
70  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
71  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
72  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
73  import org.apache.hadoop.util.StringUtils;
74  import org.apache.zookeeper.AsyncCallback;
75  import org.apache.zookeeper.CreateMode;
76  import org.apache.zookeeper.KeeperException;
77  import org.apache.zookeeper.KeeperException.NoNodeException;
78  import org.apache.zookeeper.ZooDefs.Ids;
79  import org.apache.zookeeper.data.Stat;
80  
81  /**
82   * Distributes the task of log splitting to the available region servers.
83   * Coordination happens via zookeeper. For every log file that has to be split a
84   * znode is created under <code>/hbase/splitlog</code>. SplitLogWorkers race to grab a task.
85   *
86   * <p>SplitLogManager monitors the task znodes that it creates using the
87   * timeoutMonitor thread. If a task's progress is slow then
88   * {@link #resubmit(String, Task, ResubmitDirective)} will take away the task from the owner
89   * {@link SplitLogWorker} and the task will be up for grabs again. When the task is done then the
90   * task's znode is deleted by SplitLogManager.
91   *
92   * <p>Clients call {@link #splitLogDistributed(Path)} to split a region server's
93   * log files. The caller thread waits in this method until all the log files
94   * have been split.
95   *
96   * <p>All the zookeeper calls made by this class are asynchronous. This is mainly
97   * to help reduce response time seen by the callers.
98   *
99   * <p>There is race in this design between the SplitLogManager and the
100  * SplitLogWorker. SplitLogManager might re-queue a task that has in reality
101  * already been completed by a SplitLogWorker. We rely on the idempotency of
102  * the log splitting task for correctness.
103  *
104  * <p>It is also assumed that every log splitting task is unique and once
105  * completed (either with success or with error) it will be not be submitted
106  * again. If a task is resubmitted then there is a risk that old "delete task"
107  * can delete the re-submission.
108  */
109 @InterfaceAudience.Private
110 public class SplitLogManager extends ZooKeeperListener {
111   private static final Log LOG = LogFactory.getLog(SplitLogManager.class);
112 
113   public static final int DEFAULT_TIMEOUT = 120000;
114   public static final int DEFAULT_ZK_RETRIES = 3;
115   public static final int DEFAULT_MAX_RESUBMIT = 3;
116   public static final int DEFAULT_UNASSIGNED_TIMEOUT = (3 * 60 * 1000); //3 min
117 
118   private final Stoppable stopper;
119   private final MasterServices master;
120   private final ServerName serverName;
121   private final TaskFinisher taskFinisher;
122   private FileSystem fs;
123   private Configuration conf;
124 
125   private long zkretries;
126   private long resubmit_threshold;
127   private long timeout;
128   private long unassignedTimeout;
129   private long lastTaskCreateTime = Long.MAX_VALUE;
130   public boolean ignoreZKDeleteForTesting = false;
131   private volatile long lastRecoveringNodeCreationTime = 0;
132   // When lastRecoveringNodeCreationTime is older than the following threshold, we'll check
133   // whether to GC stale recovering znodes
134   private long checkRecoveringTimeThreshold = 15000; // 15 seconds
135   private final List<Pair<Set<ServerName>, Boolean>> failedRecoveringRegionDeletions = Collections
136       .synchronizedList(new ArrayList<Pair<Set<ServerName>, Boolean>>());
137 
138   /**
139    * In distributedLogReplay mode, we need touch both splitlog and recovering-regions znodes in one
140    * operation. So the lock is used to guard such cases.
141    */
142   protected final ReentrantLock recoveringRegionLock = new ReentrantLock();
143 
144   private volatile RecoveryMode recoveryMode;
145   private volatile boolean isDrainingDone = false;
146 
147   private final ConcurrentMap<String, Task> tasks = new ConcurrentHashMap<String, Task>();
148   private TimeoutMonitor timeoutMonitor;
149 
150   private volatile Set<ServerName> deadWorkers = null;
151   private final Object deadWorkersLock = new Object();
152 
153   private Set<String> failedDeletions = null;
154 
155   /**
156    * Wrapper around {@link #SplitLogManager(ZooKeeperWatcher zkw, Configuration conf,
157    *   Stoppable stopper, MasterServices master, ServerName serverName, TaskFinisher tf)}
158    * that provides a task finisher for copying recovered edits to their final destination.
159    * The task finisher has to be robust because it can be arbitrarily restarted or called
160    * multiple times.
161    *
162    * @param zkw the ZK watcher
163    * @param conf the HBase configuration
164    * @param stopper the stoppable in case anything is wrong
165    * @param master the master services
166    * @param serverName the master server name
167    * @throws KeeperException 
168    * @throws InterruptedIOException 
169    */
170   public SplitLogManager(ZooKeeperWatcher zkw, final Configuration conf,
171       Stoppable stopper, MasterServices master, ServerName serverName) 
172       throws InterruptedIOException, KeeperException {
173     this(zkw, conf, stopper, master, serverName, new TaskFinisher() {
174       @Override
175       public Status finish(ServerName workerName, String logfile) {
176         try {
177           HLogSplitter.finishSplitLogFile(logfile, conf);
178         } catch (IOException e) {
179           LOG.warn("Could not finish splitting of log file " + logfile, e);
180           return Status.ERR;
181         }
182         return Status.DONE;
183       }
184     });
185   }
186 
187   /**
188    * Its OK to construct this object even when region-servers are not online. It does lookup the
189    * orphan tasks in zk but it doesn't block waiting for them to be done.
190    * @param zkw the ZK watcher
191    * @param conf the HBase configuration
192    * @param stopper the stoppable in case anything is wrong
193    * @param master the master services
194    * @param serverName the master server name
195    * @param tf task finisher
196    * @throws KeeperException
197    * @throws InterruptedIOException
198    */
199   public SplitLogManager(ZooKeeperWatcher zkw, Configuration conf, Stoppable stopper,
200       MasterServices master, ServerName serverName, TaskFinisher tf) throws InterruptedIOException,
201       KeeperException {
202     super(zkw);
203     this.taskFinisher = tf;
204     this.conf = conf;
205     this.stopper = stopper;
206     this.master = master;
207     this.zkretries = conf.getLong("hbase.splitlog.zk.retries", DEFAULT_ZK_RETRIES);
208     this.resubmit_threshold = conf.getLong("hbase.splitlog.max.resubmit", DEFAULT_MAX_RESUBMIT);
209     this.timeout = conf.getInt("hbase.splitlog.manager.timeout", DEFAULT_TIMEOUT);
210     this.unassignedTimeout =
211       conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT);
212 
213     // Determine recovery mode  
214     setRecoveryMode(true);
215 
216     LOG.info("Timeout=" + timeout + ", unassigned timeout=" + unassignedTimeout +
217       ", distributedLogReplay=" + (this.recoveryMode == RecoveryMode.LOG_REPLAY));
218 
219     this.serverName = serverName;
220     this.timeoutMonitor = new TimeoutMonitor(
221       conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), stopper);
222 
223     this.failedDeletions = Collections.synchronizedSet(new HashSet<String>());
224 
225     Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), serverName
226       + ".splitLogManagerTimeoutMonitor");
227     // Watcher can be null during tests with Mock'd servers.
228     if (this.watcher != null) {
229       this.watcher.registerListener(this);
230       lookForOrphans();
231     }
232   }
233 
234   private FileStatus[] getFileList(List<Path> logDirs, PathFilter filter) throws IOException {
235     List<FileStatus> fileStatus = new ArrayList<FileStatus>();
236     for (Path hLogDir : logDirs) {
237       this.fs = hLogDir.getFileSystem(conf);
238       if (!fs.exists(hLogDir)) {
239         LOG.warn(hLogDir + " doesn't exist. Nothing to do!");
240         continue;
241       }
242       FileStatus[] logfiles = FSUtils.listStatus(fs, hLogDir, filter);
243       if (logfiles == null || logfiles.length == 0) {
244         LOG.info(hLogDir + " is empty dir, no logs to split");
245       } else {
246         for (FileStatus status : logfiles)
247           fileStatus.add(status);
248       }
249     }
250     FileStatus[] a = new FileStatus[fileStatus.size()];
251     return fileStatus.toArray(a);
252   }
253 
254   /**
255    * @param logDir
256    *            one region sever hlog dir path in .logs
257    * @throws IOException
258    *             if there was an error while splitting any log file
259    * @return cumulative size of the logfiles split
260    * @throws IOException
261    */
262   public long splitLogDistributed(final Path logDir) throws IOException {
263     List<Path> logDirs = new ArrayList<Path>();
264     logDirs.add(logDir);
265     return splitLogDistributed(logDirs);
266   }
267 
268   /**
269    * The caller will block until all the log files of the given region server
270    * have been processed - successfully split or an error is encountered - by an
271    * available worker region server. This method must only be called after the
272    * region servers have been brought online.
273    *
274    * @param logDirs List of log dirs to split
275    * @throws IOException If there was an error while splitting any log file
276    * @return cumulative size of the logfiles split
277    */
278   public long splitLogDistributed(final List<Path> logDirs) throws IOException {
279     if (logDirs.isEmpty()) {
280       return 0;
281     }
282     Set<ServerName> serverNames = new HashSet<ServerName>();
283     for (Path logDir : logDirs) {
284       try {
285         ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(logDir);
286         if (serverName != null) {
287           serverNames.add(serverName);
288         }
289       } catch (IllegalArgumentException e) {
290         // ignore invalid format error.
291         LOG.warn("Cannot parse server name from " + logDir);
292       }
293     }
294     return splitLogDistributed(serverNames, logDirs, null);
295   }
296 
297   /**
298    * The caller will block until all the hbase:meta log files of the given region server
299    * have been processed - successfully split or an error is encountered - by an
300    * available worker region server. This method must only be called after the
301    * region servers have been brought online.
302    *
303    * @param logDirs List of log dirs to split
304    * @param filter the Path filter to select specific files for considering
305    * @throws IOException If there was an error while splitting any log file
306    * @return cumulative size of the logfiles split
307    */
308   public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
309       PathFilter filter) throws IOException {
310     MonitoredTask status = TaskMonitor.get().createStatus(
311           "Doing distributed log split in " + logDirs);
312     FileStatus[] logfiles = getFileList(logDirs, filter);
313     status.setStatus("Checking directory contents...");
314     LOG.debug("Scheduling batch of logs to split");
315     SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet();
316     LOG.info("started splitting " + logfiles.length + " logs in " + logDirs);
317     long t = EnvironmentEdgeManager.currentTimeMillis();
318     long totalSize = 0;
319     TaskBatch batch = new TaskBatch();
320     Boolean isMetaRecovery = (filter == null) ? null : false;
321     for (FileStatus lf : logfiles) {
322       // TODO If the log file is still being written to - which is most likely
323       // the case for the last log file - then its length will show up here
324       // as zero. The size of such a file can only be retrieved after
325       // recover-lease is done. totalSize will be under in most cases and the
326       // metrics that it drives will also be under-reported.
327       totalSize += lf.getLen();
328       String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
329       if (!enqueueSplitTask(pathToLog, batch)) {
330         throw new IOException("duplicate log split scheduled for " + lf.getPath());
331       }
332     }
333     waitForSplittingCompletion(batch, status);
334     // remove recovering regions from ZK
335     if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
336       // we split meta regions and user regions separately therefore logfiles are either all for
337       // meta or user regions but won't for both( we could have mixed situations in tests)
338       isMetaRecovery = true;
339     }
340     this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
341 
342     if (batch.done != batch.installed) {
343       batch.isDead = true;
344       SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet();
345       LOG.warn("error while splitting logs in " + logDirs +
346       " installed = " + batch.installed + " but only " + batch.done + " done");
347       String msg = "error or interrupted while splitting logs in "
348         + logDirs + " Task = " + batch;
349       status.abort(msg);
350       throw new IOException(msg);
351     }
352     for(Path logDir: logDirs){
353       status.setStatus("Cleaning up log directory...");
354       try {
355         if (fs.exists(logDir) && !fs.delete(logDir, false)) {
356           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
357         }
358       } catch (IOException ioe) {
359         FileStatus[] files = fs.listStatus(logDir);
360         if (files != null && files.length > 0) {
361           LOG.warn("returning success without actually splitting and " +
362               "deleting all the log files in path " + logDir);
363         } else {
364           LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
365         }
366       }
367       SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
368     }
369     String msg = "finished splitting (more than or equal to) " + totalSize +
370         " bytes in " + batch.installed + " log files in " + logDirs + " in " +
371         (EnvironmentEdgeManager.currentTimeMillis() - t) + "ms";
372     status.markComplete(msg);
373     LOG.info(msg);
374     return totalSize;
375   }
376 
377   /**
378    * Add a task entry to splitlog znode if it is not already there.
379    *
380    * @param taskname the path of the log to be split
381    * @param batch the batch this task belongs to
382    * @return true if a new entry is created, false if it is already there.
383    */
384   boolean enqueueSplitTask(String taskname, TaskBatch batch) {
385     SplitLogCounters.tot_mgr_log_split_start.incrementAndGet();
386     // This is a znode path under the splitlog dir with the rest of the path made up of an
387     // url encoding of the passed in log to split.
388     String path = ZKSplitLog.getEncodedNodeName(watcher, taskname);
389     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
390     Task oldtask = createTaskIfAbsent(path, batch);
391     if (oldtask == null) {
392       // publish the task in zk
393       createNode(path, zkretries);
394       return true;
395     }
396     return false;
397   }
398 
399   private void waitForSplittingCompletion(TaskBatch batch, MonitoredTask status) {
400     synchronized (batch) {
401       while ((batch.done + batch.error) != batch.installed) {
402         try {
403           status.setStatus("Waiting for distributed tasks to finish. "
404               + " scheduled=" + batch.installed
405               + " done=" + batch.done
406               + " error=" + batch.error);
407           int remaining = batch.installed - (batch.done + batch.error);
408           int actual = activeTasks(batch);
409           if (remaining != actual) {
410             LOG.warn("Expected " + remaining
411               + " active tasks, but actually there are " + actual);
412           }
413           int remainingInZK = remainingTasksInZK();
414           if (remainingInZK >= 0 && actual > remainingInZK) {
415             LOG.warn("Expected at least" + actual
416               + " tasks in ZK, but actually there are " + remainingInZK);
417           }
418           if (remainingInZK == 0 || actual == 0) {
419             LOG.warn("No more task remaining (ZK or task map), splitting "
420               + "should have completed. Remaining tasks in ZK " + remainingInZK
421               + ", active tasks in map " + actual);
422             if (remainingInZK == 0 && actual == 0) {
423               return;
424             }
425           }
426           batch.wait(100);
427           if (stopper.isStopped()) {
428             LOG.warn("Stopped while waiting for log splits to be completed");
429             return;
430           }
431         } catch (InterruptedException e) {
432           LOG.warn("Interrupted while waiting for log splits to be completed");
433           Thread.currentThread().interrupt();
434           return;
435         }
436       }
437     }
438   }
439 
440   private int activeTasks(final TaskBatch batch) {
441     int count = 0;
442     for (Task t: tasks.values()) {
443       if (t.batch == batch && t.status == TerminationStatus.IN_PROGRESS) {
444         count++;
445       }
446     }
447     return count;
448   }
449 
450   private int remainingTasksInZK() {
451     int count = 0;
452     try {
453       List<String> tasks =
454         ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
455       if (tasks != null) {
456         for (String t: tasks) {
457           if (!ZKSplitLog.isRescanNode(watcher, t)) {
458             count++;
459           }
460         }
461       }
462     } catch (KeeperException ke) {
463       LOG.warn("Failed to check remaining tasks", ke);
464       count = -1;
465     }
466     return count;
467   }
468 
469   /**
470    * It removes recovering regions under /hbase/recovering-regions/[encoded region name] so that the
471    * region server hosting the region can allow reads to the recovered region
472    * @param serverNames servers which are just recovered
473    * @param isMetaRecovery whether current recovery is for the meta region on
474    *          <code>serverNames<code>
475    */
476   private void
477       removeRecoveringRegionsFromZK(final Set<ServerName> serverNames, Boolean isMetaRecovery) {
478     if (this.recoveryMode != RecoveryMode.LOG_REPLAY) {
479       // the function is only used in WALEdit direct replay mode
480       return;
481     }
482 
483     final String metaEncodeRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedName();
484     int count = 0;
485     Set<String> recoveredServerNameSet = new HashSet<String>();
486     if (serverNames != null) {
487       for (ServerName tmpServerName : serverNames) {
488         recoveredServerNameSet.add(tmpServerName.getServerName());
489       }
490     }
491 
492     try {
493       this.recoveringRegionLock.lock();
494 
495       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
496       if (tasks != null) {
497         for (String t : tasks) {
498           if (!ZKSplitLog.isRescanNode(watcher, t)) {
499             count++;
500           }
501         }
502       }
503       if (count == 0 && this.master.isInitialized()
504           && !this.master.getServerManager().areDeadServersInProgress()) {
505         // no splitting work items left
506         deleteRecoveringRegionZNodes(watcher, null);
507         // reset lastRecoveringNodeCreationTime because we cleared all recovering znodes at
508         // this point.
509         lastRecoveringNodeCreationTime = Long.MAX_VALUE;
510       } else if (!recoveredServerNameSet.isEmpty()) {
511         // remove recovering regions which doesn't have any RS associated with it
512         List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
513         if (regions != null) {
514           for (String region : regions) {
515             if(isMetaRecovery != null) {
516               if ((isMetaRecovery && !region.equalsIgnoreCase(metaEncodeRegionName))
517                   || (!isMetaRecovery && region.equalsIgnoreCase(metaEncodeRegionName))) {
518                 // skip non-meta regions when recovering the meta region or
519                 // skip the meta region when recovering user regions
520                 continue;
521               }
522             }
523             String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
524             List<String> failedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
525             if (failedServers == null || failedServers.isEmpty()) {
526               ZKUtil.deleteNode(watcher, nodePath);
527               continue;
528             }
529             if (recoveredServerNameSet.containsAll(failedServers)) {
530               ZKUtil.deleteNodeRecursively(watcher, nodePath);
531             } else {
532               for (String failedServer : failedServers) {
533                 if (recoveredServerNameSet.contains(failedServer)) {
534                   String tmpPath = ZKUtil.joinZNode(nodePath, failedServer);
535                   ZKUtil.deleteNode(watcher, tmpPath);
536                 }
537               }
538             }
539           }
540         }
541       }
542     } catch (KeeperException ke) {
543       LOG.warn("removeRecoveringRegionsFromZK got zookeeper exception. Will retry", ke);
544       if (serverNames != null && !serverNames.isEmpty()) {
545         this.failedRecoveringRegionDeletions.add(new Pair<Set<ServerName>, Boolean>(serverNames,
546             isMetaRecovery));
547       }
548     } finally {
549       this.recoveringRegionLock.unlock();
550     }
551   }
552 
553   /**
554    * It removes stale recovering regions under /hbase/recovering-regions/[encoded region name]
555    * during master initialization phase.
556    * @param failedServers A set of known failed servers
557    * @throws KeeperException
558    */
559   void removeStaleRecoveringRegionsFromZK(final Set<ServerName> failedServers)
560       throws KeeperException, InterruptedIOException {
561 
562     Set<String> knownFailedServers = new HashSet<String>();
563     if (failedServers != null) {
564       for (ServerName tmpServerName : failedServers) {
565         knownFailedServers.add(tmpServerName.getServerName());
566       }
567     }
568 
569     this.recoveringRegionLock.lock();
570     try {
571       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
572       if (tasks != null) {
573         for (String t : tasks) {
574           byte[] data;
575           try {
576             data = ZKUtil.getData(this.watcher, ZKUtil.joinZNode(watcher.splitLogZNode, t));
577           } catch (InterruptedException e) {
578             throw new InterruptedIOException();
579           }
580           if (data != null) {
581             SplitLogTask slt = null;
582             try {
583               slt = SplitLogTask.parseFrom(data);
584             } catch (DeserializationException e) {
585               LOG.warn("Failed parse data for znode " + t, e);
586             }
587             if (slt != null && slt.isDone()) {
588               continue;
589             }
590           }
591           // decode the file name
592           t = ZKSplitLog.getFileName(t);
593           ServerName serverName = HLogUtil.getServerNameFromHLogDirectoryName(new Path(t));
594           if (serverName != null) {
595             knownFailedServers.add(serverName.getServerName());
596           } else {
597             LOG.warn("Found invalid WAL log file name:" + t);
598           }
599         }
600       }
601 
602       // remove recovering regions which doesn't have any RS associated with it
603       List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
604       if (regions != null) {
605         for (String region : regions) {
606           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
607           List<String> regionFailedServers = ZKUtil.listChildrenNoWatch(watcher, nodePath);
608           if (regionFailedServers == null || regionFailedServers.isEmpty()) {
609             ZKUtil.deleteNode(watcher, nodePath);
610             continue;
611           }
612           boolean needMoreRecovery = false;
613           for (String tmpFailedServer : regionFailedServers) {
614             if (knownFailedServers.contains(tmpFailedServer)) {
615               needMoreRecovery = true;
616               break;
617             }
618           }
619           if (!needMoreRecovery) {
620             ZKUtil.deleteNodeRecursively(watcher, nodePath);
621           }
622         }
623       }
624     } finally {
625       this.recoveringRegionLock.unlock();
626     }
627   }
628 
629   public static void deleteRecoveringRegionZNodes(ZooKeeperWatcher watcher, List<String> regions) {
630     try {
631       if (regions == null) {
632         // remove all children under /home/recovering-regions
633         LOG.debug("Garbage collecting all recovering region znodes");
634         ZKUtil.deleteChildrenRecursively(watcher, watcher.recoveringRegionsZNode);
635       } else {
636         for (String curRegion : regions) {
637           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, curRegion);
638           ZKUtil.deleteNodeRecursively(watcher, nodePath);
639         }
640       }
641     } catch (KeeperException e) {
642       LOG.warn("Cannot remove recovering regions from ZooKeeper", e);
643     }
644   }
645 
646   private void setDone(String path, TerminationStatus status) {
647     Task task = tasks.get(path);
648     if (task == null) {
649       if (!ZKSplitLog.isRescanNode(watcher, path)) {
650         SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet();
651         LOG.debug("unacquired orphan task is done " + path);
652       }
653     } else {
654       synchronized (task) {
655         if (task.status == IN_PROGRESS) {
656           if (status == SUCCESS) {
657             SplitLogCounters.tot_mgr_log_split_success.incrementAndGet();
658             LOG.info("Done splitting " + path);
659           } else {
660             SplitLogCounters.tot_mgr_log_split_err.incrementAndGet();
661             LOG.warn("Error splitting " + path);
662           }
663           task.status = status;
664           if (task.batch != null) {
665             synchronized (task.batch) {
666               if (status == SUCCESS) {
667                 task.batch.done++;
668               } else {
669                 task.batch.error++;
670               }
671               task.batch.notify();
672             }
673           }
674         }
675       }
676     }
677     // delete the task node in zk. It's an async
678     // call and no one is blocked waiting for this node to be deleted. All
679     // task names are unique (log.<timestamp>) there is no risk of deleting
680     // a future task.
681     // if a deletion fails, TimeoutMonitor will retry the same deletion later
682     deleteNode(path, zkretries);
683     return;
684   }
685 
686   private void createNode(String path, Long retry_count) {
687     SplitLogTask slt = new SplitLogTask.Unassigned(serverName, this.recoveryMode);
688     ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), retry_count);
689     SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
690     return;
691   }
692 
693   private void createNodeSuccess(String path) {
694     LOG.debug("put up splitlog task at znode " + path);
695     getDataSetWatch(path, zkretries);
696   }
697 
698   private void createNodeFailure(String path) {
699     // TODO the Manager should split the log locally instead of giving up
700     LOG.warn("failed to create task node" + path);
701     setDone(path, FAILURE);
702   }
703 
704 
705   private void getDataSetWatch(String path, Long retry_count) {
706     this.watcher.getRecoverableZooKeeper().getZooKeeper().
707         getData(path, this.watcher,
708         new GetDataAsyncCallback(), retry_count);
709     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
710   }
711 
712   private void tryGetDataSetWatch(String path) {
713     // A negative retry count will lead to ignoring all error processing.
714     this.watcher.getRecoverableZooKeeper().getZooKeeper().
715         getData(path, this.watcher,
716         new GetDataAsyncCallback(), Long.valueOf(-1) /* retry count */);
717     SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet();
718   }
719 
720   private void getDataSetWatchSuccess(String path, byte[] data, int version)
721   throws DeserializationException {
722     if (data == null) {
723       if (version == Integer.MIN_VALUE) {
724         // assume all done. The task znode suddenly disappeared.
725         setDone(path, SUCCESS);
726         return;
727       }
728       SplitLogCounters.tot_mgr_null_data.incrementAndGet();
729       LOG.fatal("logic error - got null data " + path);
730       setDone(path, FAILURE);
731       return;
732     }
733     data = this.watcher.getRecoverableZooKeeper().removeMetaData(data);
734     SplitLogTask slt = SplitLogTask.parseFrom(data);
735     if (slt.isUnassigned()) {
736       LOG.debug("task not yet acquired " + path + " ver = " + version);
737       handleUnassignedTask(path);
738     } else if (slt.isOwned()) {
739       heartbeat(path, version, slt.getServerName());
740     } else if (slt.isResigned()) {
741       LOG.info("task " + path + " entered state: " + slt.toString());
742       resubmitOrFail(path, FORCE);
743     } else if (slt.isDone()) {
744       LOG.info("task " + path + " entered state: " + slt.toString());
745       if (taskFinisher != null && !ZKSplitLog.isRescanNode(watcher, path)) {
746         if (taskFinisher.finish(slt.getServerName(), ZKSplitLog.getFileName(path)) == Status.DONE) {
747           setDone(path, SUCCESS);
748         } else {
749           resubmitOrFail(path, CHECK);
750         }
751       } else {
752         setDone(path, SUCCESS);
753       }
754     } else if (slt.isErr()) {
755       LOG.info("task " + path + " entered state: " + slt.toString());
756       resubmitOrFail(path, CHECK);
757     } else {
758       LOG.fatal("logic error - unexpected zk state for path = " + path + " data = " + slt.toString());
759       setDone(path, FAILURE);
760     }
761   }
762 
763   private void getDataSetWatchFailure(String path) {
764     LOG.warn("failed to set data watch " + path);
765     setDone(path, FAILURE);
766   }
767 
768   /**
769    * It is possible for a task to stay in UNASSIGNED state indefinitely - say
770    * SplitLogManager wants to resubmit a task. It forces the task to UNASSIGNED
771    * state but it dies before it could create the RESCAN task node to signal
772    * the SplitLogWorkers to pick up the task. To prevent this scenario the
773    * SplitLogManager resubmits all orphan and UNASSIGNED tasks at startup.
774    *
775    * @param path
776    */
777   private void handleUnassignedTask(String path) {
778     if (ZKSplitLog.isRescanNode(watcher, path)) {
779       return;
780     }
781     Task task = findOrCreateOrphanTask(path);
782     if (task.isOrphan() && (task.incarnation == 0)) {
783       LOG.info("resubmitting unassigned orphan task " + path);
784       // ignore failure to resubmit. The timeout-monitor will handle it later
785       // albeit in a more crude fashion
786       resubmit(path, task, FORCE);
787     }
788   }
789 
790   /**
791    * Helper function to check whether to abandon retries in ZooKeeper AsyncCallback functions
792    * @param statusCode integer value of a ZooKeeper exception code
793    * @param action description message about the retried action
794    * @return true when need to abandon retries otherwise false
795    */
796   private boolean needAbandonRetries(int statusCode, String action) {
797     if (statusCode == KeeperException.Code.SESSIONEXPIRED.intValue()) {
798       LOG.error("ZK session expired. Master is expected to shut down. Abandoning retries for "
799           + "action=" + action);
800       return true;
801     }
802     return false;
803   }
804 
805   private void heartbeat(String path, int new_version, ServerName workerName) {
806     Task task = findOrCreateOrphanTask(path);
807     if (new_version != task.last_version) {
808       if (task.isUnassigned()) {
809         LOG.info("task " + path + " acquired by " + workerName);
810       }
811       task.heartbeat(EnvironmentEdgeManager.currentTimeMillis(), new_version, workerName);
812       SplitLogCounters.tot_mgr_heartbeat.incrementAndGet();
813     } else {
814       // duplicate heartbeats - heartbeats w/o zk node version
815       // changing - are possible. The timeout thread does
816       // getDataSetWatch() just to check whether a node still
817       // exists or not
818     }
819     return;
820   }
821 
822   private boolean resubmit(String path, Task task, ResubmitDirective directive) {
823     // its ok if this thread misses the update to task.deleted. It will fail later
824     if (task.status != IN_PROGRESS) {
825       return false;
826     }
827     int version;
828     if (directive != FORCE) {
829       // We're going to resubmit:
830       //  1) immediately if the worker server is now marked as dead
831       //  2) after a configurable timeout if the server is not marked as dead but has still not
832       //       finished the task. This allows to continue if the worker cannot actually handle it,
833       //       for any reason.
834       final long time = EnvironmentEdgeManager.currentTimeMillis() - task.last_update;
835       final boolean alive = master.getServerManager() != null ?
836           master.getServerManager().isServerOnline(task.cur_worker_name) : true;
837       if (alive && time < timeout) {
838         LOG.trace("Skipping the resubmit of " + task.toString() + "  because the server " +
839             task.cur_worker_name + " is not marked as dead, we waited for " + time +
840             " while the timeout is " + timeout);
841         return false;
842       }
843       if (task.unforcedResubmits.get() >= resubmit_threshold) {
844         if (!task.resubmitThresholdReached) {
845           task.resubmitThresholdReached = true;
846           SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
847           LOG.info("Skipping resubmissions of task " + path +
848               " because threshold " + resubmit_threshold + " reached");
849         }
850         return false;
851       }
852       // race with heartbeat() that might be changing last_version
853       version = task.last_version;
854     } else {
855       SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet();
856       version = -1;
857     }
858     LOG.info("resubmitting task " + path);
859     task.incarnation++;
860     try {
861       // blocking zk call but this is done from the timeout thread
862       SplitLogTask slt = new SplitLogTask.Unassigned(this.serverName, this.recoveryMode);
863       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(), version) == false) {
864         LOG.debug("failed to resubmit task " + path +
865             " version changed");
866         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
867         return false;
868       }
869     } catch (NoNodeException e) {
870       LOG.warn("failed to resubmit because znode doesn't exist " + path +
871           " task done (or forced done by removing the znode)");
872       try {
873         getDataSetWatchSuccess(path, null, Integer.MIN_VALUE);
874       } catch (DeserializationException e1) {
875         LOG.debug("Failed to re-resubmit task " + path + " because of deserialization issue", e1);
876         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
877         return false;
878       }
879       return false;
880     } catch (KeeperException.BadVersionException e) {
881       LOG.debug("failed to resubmit task " + path + " version changed");
882       task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
883       return false;
884     } catch (KeeperException e) {
885       SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet();
886       LOG.warn("failed to resubmit " + path, e);
887       return false;
888     }
889     // don't count forced resubmits
890     if (directive != FORCE) {
891       task.unforcedResubmits.incrementAndGet();
892     }
893     task.setUnassigned();
894     createRescanNode(Long.MAX_VALUE);
895     SplitLogCounters.tot_mgr_resubmit.incrementAndGet();
896     return true;
897   }
898 
899   private void resubmitOrFail(String path, ResubmitDirective directive) {
900     if (resubmit(path, findOrCreateOrphanTask(path), directive) == false) {
901       setDone(path, FAILURE);
902     }
903   }
904 
905   private void deleteNode(String path, Long retries) {
906     SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet();
907     // Once a task znode is ready for delete, that is it is in the TASK_DONE
908     // state, then no one should be writing to it anymore. That is no one
909     // will be updating the znode version any more.
910     this.watcher.getRecoverableZooKeeper().getZooKeeper().
911       delete(path, -1, new DeleteAsyncCallback(),
912         retries);
913   }
914 
915   private void deleteNodeSuccess(String path) {
916     if (ignoreZKDeleteForTesting) {
917       return;
918     }
919     Task task;
920     task = tasks.remove(path);
921     if (task == null) {
922       if (ZKSplitLog.isRescanNode(watcher, path)) {
923         SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet();
924       }
925       SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet();
926       LOG.debug("deleted task without in memory state " + path);
927       return;
928     }
929     synchronized (task) {
930       task.status = DELETED;
931       task.notify();
932     }
933     SplitLogCounters.tot_mgr_task_deleted.incrementAndGet();
934   }
935 
936   private void deleteNodeFailure(String path) {
937     LOG.info("Failed to delete node " + path + " and will retry soon.");
938     return;
939   }
940 
941   /**
942    * signal the workers that a task was resubmitted by creating the
943    * RESCAN node.
944    * @throws KeeperException
945    */
946   private void createRescanNode(long retries) {
947     // The RESCAN node will be deleted almost immediately by the
948     // SplitLogManager as soon as it is created because it is being
949     // created in the DONE state. This behavior prevents a buildup
950     // of RESCAN nodes. But there is also a chance that a SplitLogWorker
951     // might miss the watch-trigger that creation of RESCAN node provides.
952     // Since the TimeoutMonitor will keep resubmitting UNASSIGNED tasks
953     // therefore this behavior is safe.
954     lastTaskCreateTime = EnvironmentEdgeManager.currentTimeMillis();
955     SplitLogTask slt = new SplitLogTask.Done(this.serverName, this.recoveryMode);
956     this.watcher.getRecoverableZooKeeper().getZooKeeper().
957       create(ZKSplitLog.getRescanNode(watcher), slt.toByteArray(),
958         Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
959         new CreateRescanAsyncCallback(), Long.valueOf(retries));
960   }
961 
962   private void createRescanSuccess(String path) {
963     SplitLogCounters.tot_mgr_rescan.incrementAndGet();
964     getDataSetWatch(path, zkretries);
965   }
966 
967   private void createRescanFailure() {
968     LOG.fatal("logic failure, rescan failure must not happen");
969   }
970 
971   /**
972    * @param path
973    * @param batch
974    * @return null on success, existing task on error
975    */
976   private Task createTaskIfAbsent(String path, TaskBatch batch) {
977     Task oldtask;
978     // batch.installed is only changed via this function and
979     // a single thread touches batch.installed.
980     Task newtask = new Task();
981     newtask.batch = batch;
982     oldtask = tasks.putIfAbsent(path, newtask);
983     if (oldtask == null) {
984       batch.installed++;
985       return  null;
986     }
987     // new task was not used.
988     synchronized (oldtask) {
989       if (oldtask.isOrphan()) {
990         if (oldtask.status == SUCCESS) {
991           // The task is already done. Do not install the batch for this
992           // task because it might be too late for setDone() to update
993           // batch.done. There is no need for the batch creator to wait for
994           // this task to complete.
995           return (null);
996         }
997         if (oldtask.status == IN_PROGRESS) {
998           oldtask.batch = batch;
999           batch.installed++;
1000           LOG.debug("Previously orphan task " + path + " is now being waited upon");
1001           return null;
1002         }
1003         while (oldtask.status == FAILURE) {
1004           LOG.debug("wait for status of task " + path + " to change to DELETED");
1005           SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet();
1006           try {
1007             oldtask.wait();
1008           } catch (InterruptedException e) {
1009             Thread.currentThread().interrupt();
1010             LOG.warn("Interrupted when waiting for znode delete callback");
1011             // fall through to return failure
1012             break;
1013           }
1014         }
1015         if (oldtask.status != DELETED) {
1016           LOG.warn("Failure because previously failed task" +
1017               " state still present. Waiting for znode delete callback" +
1018               " path=" + path);
1019           return oldtask;
1020         }
1021         // reinsert the newTask and it must succeed this time
1022         Task t = tasks.putIfAbsent(path, newtask);
1023         if (t == null) {
1024           batch.installed++;
1025           return  null;
1026         }
1027         LOG.fatal("Logic error. Deleted task still present in tasks map");
1028         assert false : "Deleted task still present in tasks map";
1029         return t;
1030       }
1031       LOG.warn("Failure because two threads can't wait for the same task; path=" + path);
1032       return oldtask;
1033     }
1034   }
1035 
1036   Task findOrCreateOrphanTask(String path) {
1037     Task orphanTask = new Task();
1038     Task task;
1039     task = tasks.putIfAbsent(path, orphanTask);
1040     if (task == null) {
1041       LOG.info("creating orphan task " + path);
1042       SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet();
1043       task = orphanTask;
1044     }
1045     return task;
1046   }
1047 
1048   @Override
1049   public void nodeDataChanged(String path) {
1050     Task task;
1051     task = tasks.get(path);
1052     if (task != null || ZKSplitLog.isRescanNode(watcher, path)) {
1053       if (task != null) {
1054         task.heartbeatNoDetails(EnvironmentEdgeManager.currentTimeMillis());
1055       }
1056       getDataSetWatch(path, zkretries);
1057     }
1058   }
1059 
1060   public void stop() {
1061     if (timeoutMonitor != null) {
1062       timeoutMonitor.interrupt();
1063     }
1064   }
1065 
1066   private void lookForOrphans() {
1067     List<String> orphans;
1068     try {
1069        orphans = ZKUtil.listChildrenNoWatch(this.watcher,
1070           this.watcher.splitLogZNode);
1071       if (orphans == null) {
1072         LOG.warn("could not get children of " + this.watcher.splitLogZNode);
1073         return;
1074       }
1075     } catch (KeeperException e) {
1076       LOG.warn("could not get children of " + this.watcher.splitLogZNode +
1077           " " + StringUtils.stringifyException(e));
1078       return;
1079     }
1080     int rescan_nodes = 0;
1081     for (String path : orphans) {
1082       String nodepath = ZKUtil.joinZNode(watcher.splitLogZNode, path);
1083       if (ZKSplitLog.isRescanNode(watcher, nodepath)) {
1084         rescan_nodes++;
1085         LOG.debug("found orphan rescan node " + path);
1086       } else {
1087         LOG.info("found orphan task " + path);
1088       }
1089       getDataSetWatch(nodepath, zkretries);
1090     }
1091     LOG.info("Found " + (orphans.size() - rescan_nodes) + " orphan tasks and " +
1092         rescan_nodes + " rescan nodes");
1093   }
1094 
1095   /**
1096    * Create znodes /hbase/recovering-regions/[region_ids...]/[failed region server names ...] for
1097    * all regions of the passed in region servers
1098    * @param serverName the name of a region server
1099    * @param userRegions user regiones assigned on the region server
1100    */
1101   void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
1102       throws KeeperException, InterruptedIOException {
1103     if (userRegions == null || (this.recoveryMode != RecoveryMode.LOG_REPLAY)) {
1104       return;
1105     }
1106 
1107     try {
1108       this.recoveringRegionLock.lock();
1109       // mark that we're creating recovering znodes
1110       this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
1111 
1112       for (HRegionInfo region : userRegions) {
1113         String regionEncodeName = region.getEncodedName();
1114         long retries = this.zkretries;
1115 
1116         do {
1117           String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
1118           long lastRecordedFlushedSequenceId = -1;
1119           try {
1120             long lastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
1121               regionEncodeName.getBytes());
1122 
1123             /*
1124              * znode layout: .../region_id[last known flushed sequence id]/failed server[last known
1125              * flushed sequence id for the server]
1126              */
1127             byte[] data = ZKUtil.getData(this.watcher, nodePath);
1128             if (data == null) {
1129               ZKUtil.createSetData(this.watcher, nodePath,
1130                 ZKUtil.positionToByteArray(lastSequenceId));
1131             } else {
1132               lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
1133               if (lastRecordedFlushedSequenceId < lastSequenceId) {
1134                 // update last flushed sequence id in the region level
1135                 ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
1136               }
1137             }
1138             // go one level deeper with server name
1139             nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
1140             if (lastSequenceId <= lastRecordedFlushedSequenceId) {
1141               // the newly assigned RS failed even before any flush to the region
1142               lastSequenceId = lastRecordedFlushedSequenceId;
1143             }
1144             ZKUtil.createSetData(this.watcher, nodePath,
1145               ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
1146             LOG.debug("Mark region " + regionEncodeName + " recovering from failed region server "
1147                 + serverName);
1148 
1149             // break retry loop
1150             break;
1151           } catch (KeeperException e) {
1152             // ignore ZooKeeper exceptions inside retry loop
1153             if (retries <= 1) {
1154               throw e;
1155             }
1156             // wait a little bit for retry
1157             try {
1158               Thread.sleep(20);
1159             } catch (InterruptedException e1) {
1160               throw new InterruptedIOException();
1161             }
1162           } catch (InterruptedException e) {
1163             throw new InterruptedIOException();
1164           }
1165         } while ((--retries) > 0 && (!this.stopper.isStopped()));
1166       }
1167     } finally {
1168       this.recoveringRegionLock.unlock();
1169     }
1170   }
1171 
1172   /**
1173    * @param bytes - Content of a failed region server or recovering region znode.
1174    * @return long - The last flushed sequence Id for the region server
1175    */
1176   public static long parseLastFlushedSequenceIdFrom(final byte[] bytes) {
1177     long lastRecordedFlushedSequenceId = -1l;
1178     try {
1179       lastRecordedFlushedSequenceId = ZKUtil.parseHLogPositionFrom(bytes);
1180     } catch (DeserializationException e) {
1181       lastRecordedFlushedSequenceId = -1l;
1182       LOG.warn("Can't parse last flushed sequence Id", e);
1183     }
1184     return lastRecordedFlushedSequenceId;
1185   }
1186 
1187   /**
1188    * check if /hbase/recovering-regions/<current region encoded name> exists. Returns true if exists
1189    * and set watcher as well.
1190    * @param zkw
1191    * @param regionEncodedName region encode name
1192    * @return true when /hbase/recovering-regions/<current region encoded name> exists
1193    * @throws KeeperException
1194    */
1195   public static boolean
1196       isRegionMarkedRecoveringInZK(ZooKeeperWatcher zkw, String regionEncodedName)
1197           throws KeeperException {
1198     boolean result = false;
1199     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, regionEncodedName);
1200 
1201     byte[] node = ZKUtil.getDataAndWatch(zkw, nodePath);
1202     if (node != null) {
1203       result = true;
1204     }
1205     return result;
1206   }
1207 
1208   /**
1209    * This function is used in distributedLogReplay to fetch last flushed sequence id from ZK
1210    * @param zkw
1211    * @param serverName
1212    * @param encodedRegionName
1213    * @return the last flushed sequence ids recorded in ZK of the region for <code>serverName<code>
1214    * @throws IOException
1215    */
1216   public static RegionStoreSequenceIds getRegionFlushedSequenceId(ZooKeeperWatcher zkw,
1217       String serverName, String encodedRegionName) throws IOException {
1218     // when SplitLogWorker recovers a region by directly replaying unflushed WAL edits,
1219     // last flushed sequence Id changes when newly assigned RS flushes writes to the region.
1220     // If the newly assigned RS fails again(a chained RS failures scenario), the last flushed
1221     // sequence Id name space (sequence Id only valid for a particular RS instance), changes
1222     // when different newly assigned RS flushes the region.
1223     // Therefore, in this mode we need to fetch last sequence Ids from ZK where we keep history of
1224     // last flushed sequence Id for each failed RS instance.
1225     RegionStoreSequenceIds result = null;
1226     String nodePath = ZKUtil.joinZNode(zkw.recoveringRegionsZNode, encodedRegionName);
1227     nodePath = ZKUtil.joinZNode(nodePath, serverName);
1228     try {
1229       byte[] data;
1230       try {
1231         data = ZKUtil.getData(zkw, nodePath);
1232       } catch (InterruptedException e) {
1233         throw new InterruptedIOException();
1234       }
1235       if (data != null) {
1236         result = ZKUtil.parseRegionStoreSequenceIds(data);
1237       }
1238     } catch (KeeperException e) {
1239       throw new IOException("Cannot get lastFlushedSequenceId from ZooKeeper for server="
1240           + serverName + "; region=" + encodedRegionName, e);
1241     } catch (DeserializationException e) {
1242       LOG.warn("Can't parse last flushed sequence Id from znode:" + nodePath, e);
1243     }
1244     return result;
1245   }
1246   
1247   /**
1248    * This function is to set recovery mode from outstanding split log tasks from before or
1249    * current configuration setting
1250    * @param isForInitialization
1251    * @throws KeeperException
1252    * @throws InterruptedIOException
1253    */
1254   public void setRecoveryMode(boolean isForInitialization) throws KeeperException,
1255       InterruptedIOException {
1256     if(this.isDrainingDone) {
1257       // when there is no outstanding splitlogtask after master start up, we already have up to date
1258       // recovery mode
1259       return;
1260     }
1261     if(this.watcher == null) {
1262       // when watcher is null(testing code) and recovery mode can only be LOG_SPLITTING
1263       this.isDrainingDone = true;
1264       this.recoveryMode = RecoveryMode.LOG_SPLITTING;
1265       return;
1266     }
1267     boolean hasSplitLogTask = false;
1268     boolean hasRecoveringRegions = false;
1269     RecoveryMode previousRecoveryMode = RecoveryMode.UNKNOWN;
1270     RecoveryMode recoveryModeInConfig = (isDistributedLogReplay(conf)) ? 
1271       RecoveryMode.LOG_REPLAY : RecoveryMode.LOG_SPLITTING;
1272 
1273     // Firstly check if there are outstanding recovering regions
1274     List<String> regions = ZKUtil.listChildrenNoWatch(watcher, watcher.recoveringRegionsZNode);
1275     if (regions != null && !regions.isEmpty()) {
1276       hasRecoveringRegions = true;
1277       previousRecoveryMode = RecoveryMode.LOG_REPLAY;
1278     }
1279     if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1280       // Secondly check if there are outstanding split log task
1281       List<String> tasks = ZKUtil.listChildrenNoWatch(watcher, watcher.splitLogZNode);
1282       if (tasks != null && !tasks.isEmpty()) {
1283         hasSplitLogTask = true;
1284         if (isForInitialization) {
1285           // during initialization, try to get recovery mode from splitlogtask
1286           for (String task : tasks) {
1287             try {
1288               byte[] data = ZKUtil.getData(this.watcher,
1289                 ZKUtil.joinZNode(watcher.splitLogZNode, task));
1290               if (data == null) continue;
1291               SplitLogTask slt = SplitLogTask.parseFrom(data);
1292               previousRecoveryMode = slt.getMode();
1293               if (previousRecoveryMode == RecoveryMode.UNKNOWN) {
1294                 // created by old code base where we don't set recovery mode in splitlogtask
1295                 // we can safely set to LOG_SPLITTING because we're in master initialization code 
1296                 // before SSH is enabled & there is no outstanding recovering regions
1297                 previousRecoveryMode = RecoveryMode.LOG_SPLITTING;
1298               }
1299               break;
1300             } catch (DeserializationException e) {
1301               LOG.warn("Failed parse data for znode " + task, e);
1302             } catch (InterruptedException e) {
1303               throw new InterruptedIOException();
1304             }
1305           }
1306         }
1307       }
1308     }
1309 
1310     synchronized(this) {
1311       if(this.isDrainingDone) {
1312         return;
1313       }
1314       if (!hasSplitLogTask && !hasRecoveringRegions) {
1315         this.isDrainingDone = true;
1316         this.recoveryMode = recoveryModeInConfig;
1317         return;
1318       } else if (!isForInitialization) {
1319         // splitlogtask hasn't drained yet, keep existing recovery mode
1320         return;
1321       }
1322   
1323       if (previousRecoveryMode != RecoveryMode.UNKNOWN) {
1324         this.isDrainingDone = (previousRecoveryMode == recoveryModeInConfig);
1325         this.recoveryMode = previousRecoveryMode;
1326       } else {
1327         this.recoveryMode = recoveryModeInConfig;
1328       }
1329     }
1330   }
1331 
1332   public RecoveryMode getRecoveryMode() {
1333     return this.recoveryMode;
1334   }
1335   
1336   /**
1337    * Returns if distributed log replay is turned on or not
1338    * @param conf
1339    * @return true when distributed log replay is turned on
1340    */
1341   private boolean isDistributedLogReplay(Configuration conf) {
1342     boolean dlr = conf.getBoolean(HConstants.DISTRIBUTED_LOG_REPLAY_KEY,
1343       HConstants.DEFAULT_DISTRIBUTED_LOG_REPLAY_CONFIG);
1344     int version = conf.getInt(HFile.FORMAT_VERSION_KEY, HFile.MAX_FORMAT_VERSION);
1345     if (LOG.isDebugEnabled()) {
1346       LOG.debug("Distributed log replay=" + dlr + ", " + HFile.FORMAT_VERSION_KEY + "=" + version);
1347     }
1348     // For distributed log replay, hfile version must be 3 at least; we need tag support.
1349     return dlr && (version >= 3);
1350   }
1351 
1352   /**
1353    * Keeps track of the batch of tasks submitted together by a caller in splitLogDistributed().
1354    * Clients threads use this object to wait for all their tasks to be done.
1355    * <p>
1356    * All access is synchronized.
1357    */
1358   static class TaskBatch {
1359     int installed = 0;
1360     int done = 0;
1361     int error = 0;
1362     volatile boolean isDead = false;
1363 
1364     @Override
1365     public String toString() {
1366       return ("installed = " + installed + " done = " + done + " error = " + error);
1367     }
1368   }
1369 
1370   /**
1371    * in memory state of an active task.
1372    */
1373   static class Task {
1374     volatile long last_update;
1375     volatile int last_version;
1376     volatile ServerName cur_worker_name;
1377     volatile TaskBatch batch;
1378     volatile TerminationStatus status;
1379     volatile int incarnation;
1380     final AtomicInteger unforcedResubmits = new AtomicInteger();
1381     volatile boolean resubmitThresholdReached;
1382 
1383     @Override
1384     public String toString() {
1385       return ("last_update = " + last_update +
1386           " last_version = " + last_version +
1387           " cur_worker_name = " + cur_worker_name +
1388           " status = " + status +
1389           " incarnation = " + incarnation +
1390           " resubmits = " + unforcedResubmits.get() +
1391           " batch = " + batch);
1392     }
1393 
1394     Task() {
1395       incarnation = 0;
1396       last_version = -1;
1397       status = IN_PROGRESS;
1398       setUnassigned();
1399     }
1400 
1401     public boolean isOrphan() {
1402       return (batch == null || batch.isDead);
1403     }
1404 
1405     public boolean isUnassigned() {
1406       return (cur_worker_name == null);
1407     }
1408 
1409     public void heartbeatNoDetails(long time) {
1410       last_update = time;
1411     }
1412 
1413     public void heartbeat(long time, int version, ServerName worker) {
1414       last_version = version;
1415       last_update = time;
1416       cur_worker_name = worker;
1417     }
1418 
1419     public void setUnassigned() {
1420       cur_worker_name = null;
1421       last_update = -1;
1422     }
1423   }
1424 
1425   void handleDeadWorker(ServerName workerName) {
1426     // resubmit the tasks on the TimeoutMonitor thread. Makes it easier
1427     // to reason about concurrency. Makes it easier to retry.
1428     synchronized (deadWorkersLock) {
1429       if (deadWorkers == null) {
1430         deadWorkers = new HashSet<ServerName>(100);
1431       }
1432       deadWorkers.add(workerName);
1433     }
1434     LOG.info("dead splitlog worker " + workerName);
1435   }
1436 
1437   void handleDeadWorkers(Set<ServerName> serverNames) {
1438     synchronized (deadWorkersLock) {
1439       if (deadWorkers == null) {
1440         deadWorkers = new HashSet<ServerName>(100);
1441       }
1442       deadWorkers.addAll(serverNames);
1443     }
1444     LOG.info("dead splitlog workers " + serverNames);
1445   }
1446 
1447   /**
1448    * Periodically checks all active tasks and resubmits the ones that have timed
1449    * out
1450    */
1451   private class TimeoutMonitor extends Chore {
1452     private long lastLog = 0;
1453 
1454     public TimeoutMonitor(final int period, Stoppable stopper) {
1455       super("SplitLogManager Timeout Monitor", period, stopper);
1456     }
1457 
1458     @Override
1459     protected void chore() {
1460       int resubmitted = 0;
1461       int unassigned = 0;
1462       int tot = 0;
1463       boolean found_assigned_task = false;
1464       Set<ServerName> localDeadWorkers;
1465 
1466       synchronized (deadWorkersLock) {
1467         localDeadWorkers = deadWorkers;
1468         deadWorkers = null;
1469       }
1470 
1471       for (Map.Entry<String, Task> e : tasks.entrySet()) {
1472         String path = e.getKey();
1473         Task task = e.getValue();
1474         ServerName cur_worker = task.cur_worker_name;
1475         tot++;
1476         // don't easily resubmit a task which hasn't been picked up yet. It
1477         // might be a long while before a SplitLogWorker is free to pick up a
1478         // task. This is because a SplitLogWorker picks up a task one at a
1479         // time. If we want progress when there are no region servers then we
1480         // will have to run a SplitLogWorker thread in the Master.
1481         if (task.isUnassigned()) {
1482           unassigned++;
1483           continue;
1484         }
1485         found_assigned_task = true;
1486         if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
1487           SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet();
1488           if (resubmit(path, task, FORCE)) {
1489             resubmitted++;
1490           } else {
1491             handleDeadWorker(cur_worker);
1492             LOG.warn("Failed to resubmit task " + path + " owned by dead " +
1493                 cur_worker + ", will retry.");
1494           }
1495         } else if (resubmit(path, task, CHECK)) {
1496           resubmitted++;
1497         }
1498       }
1499       if (tot > 0) {
1500         long now = EnvironmentEdgeManager.currentTimeMillis();
1501         if (now > lastLog + 5000) {
1502           lastLog = now;
1503           LOG.info("total tasks = " + tot + " unassigned = " + unassigned + " tasks=" + tasks);
1504         }
1505       }
1506       if (resubmitted > 0) {
1507         LOG.info("resubmitted " + resubmitted + " out of " + tot + " tasks");
1508       }
1509       // If there are pending tasks and all of them have been unassigned for
1510       // some time then put up a RESCAN node to ping the workers.
1511       // ZKSplitlog.DEFAULT_UNASSIGNED_TIMEOUT is of the order of minutes
1512       // because a. it is very unlikely that every worker had a
1513       // transient error when trying to grab the task b. if there are no
1514       // workers then all tasks wills stay unassigned indefinitely and the
1515       // manager will be indefinitely creating RESCAN nodes. TODO may be the
1516       // master should spawn both a manager and a worker thread to guarantee
1517       // that there is always one worker in the system
1518       if (tot > 0 && !found_assigned_task &&
1519           ((EnvironmentEdgeManager.currentTimeMillis() - lastTaskCreateTime) >
1520           unassignedTimeout)) {
1521         for (Map.Entry<String, Task> e : tasks.entrySet()) {
1522           String path = e.getKey();
1523           Task task = e.getValue();
1524           // we have to do task.isUnassigned() check again because tasks might
1525           // have been asynchronously assigned. There is no locking required
1526           // for these checks ... it is OK even if tryGetDataSetWatch() is
1527           // called unnecessarily for a task
1528           if (task.isUnassigned() && (task.status != FAILURE)) {
1529             // We just touch the znode to make sure its still there
1530             tryGetDataSetWatch(path);
1531           }
1532         }
1533         createRescanNode(Long.MAX_VALUE);
1534         SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet();
1535         LOG.debug("resubmitting unassigned task(s) after timeout");
1536       }
1537 
1538       // Retry previously failed deletes
1539       if (failedDeletions.size() > 0) {
1540         List<String> tmpPaths = new ArrayList<String>(failedDeletions);
1541         for (String tmpPath : tmpPaths) {
1542           // deleteNode is an async call
1543           deleteNode(tmpPath, zkretries);
1544         }
1545         failedDeletions.removeAll(tmpPaths);
1546       }
1547 
1548       // Garbage collect left-over /hbase/recovering-regions/... znode
1549       long timeInterval = EnvironmentEdgeManager.currentTimeMillis()
1550           - lastRecoveringNodeCreationTime;
1551       if (!failedRecoveringRegionDeletions.isEmpty()
1552           || (tot == 0 && tasks.size() == 0 && (timeInterval > checkRecoveringTimeThreshold))) {
1553         // inside the function there have more checks before GC anything
1554         if (!failedRecoveringRegionDeletions.isEmpty()) {
1555           List<Pair<Set<ServerName>, Boolean>> previouslyFailedDeletions =
1556               new ArrayList<Pair<Set<ServerName>, Boolean>>(failedRecoveringRegionDeletions);
1557           failedRecoveringRegionDeletions.removeAll(previouslyFailedDeletions);
1558           for (Pair<Set<ServerName>, Boolean> failedDeletion : previouslyFailedDeletions) {
1559             removeRecoveringRegionsFromZK(failedDeletion.getFirst(), failedDeletion.getSecond());
1560           }
1561         } else {
1562           removeRecoveringRegionsFromZK(null, null);
1563         }
1564       }
1565     }
1566   }
1567 
1568   /**
1569    * Asynchronous handler for zk create node results.
1570    * Retries on failures.
1571    */
1572   class CreateAsyncCallback implements AsyncCallback.StringCallback {
1573     private final Log LOG = LogFactory.getLog(CreateAsyncCallback.class);
1574 
1575     @Override
1576     public void processResult(int rc, String path, Object ctx, String name) {
1577       SplitLogCounters.tot_mgr_node_create_result.incrementAndGet();
1578       if (rc != 0) {
1579         if (needAbandonRetries(rc, "Create znode " + path)) {
1580           createNodeFailure(path);
1581           return;
1582         }
1583         if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
1584           // What if there is a delete pending against this pre-existing
1585           // znode? Then this soon-to-be-deleted task znode must be in TASK_DONE
1586           // state. Only operations that will be carried out on this node by
1587           // this manager are get-znode-data, task-finisher and delete-znode.
1588           // And all code pieces correctly handle the case of suddenly
1589           // disappearing task-znode.
1590           LOG.debug("found pre-existing znode " + path);
1591           SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet();
1592         } else {
1593           Long retry_count = (Long)ctx;
1594           LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " +
1595               path + " remaining retries=" + retry_count);
1596           if (retry_count == 0) {
1597             SplitLogCounters.tot_mgr_node_create_err.incrementAndGet();
1598             createNodeFailure(path);
1599           } else {
1600             SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet();
1601             createNode(path, retry_count - 1);
1602           }
1603           return;
1604         }
1605       }
1606       createNodeSuccess(path);
1607     }
1608   }
1609 
1610   /**
1611    * Asynchronous handler for zk get-data-set-watch on node results.
1612    * Retries on failures.
1613    */
1614   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
1615     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
1616 
1617     @Override
1618     public void processResult(int rc, String path, Object ctx, byte[] data,
1619         Stat stat) {
1620       SplitLogCounters.tot_mgr_get_data_result.incrementAndGet();
1621       if (rc != 0) {
1622         if (needAbandonRetries(rc, "GetData from znode " + path)) {
1623           return;
1624         }
1625         if (rc == KeeperException.Code.NONODE.intValue()) {
1626           SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet();
1627           LOG.warn("task znode " + path + " vanished or not created yet.");
1628           // ignore since we should not end up in a case where there is in-memory task,
1629           // but no znode. The only case is between the time task is created in-memory
1630           // and the znode is created. See HBASE-11217.
1631           return;
1632         }
1633         Long retry_count = (Long) ctx;
1634 
1635         if (retry_count < 0) {
1636           LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1637               path + ". Ignoring error. No error handling. No retrying.");
1638           return;
1639         }
1640         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " +
1641             path + " remaining retries=" + retry_count);
1642         if (retry_count == 0) {
1643           SplitLogCounters.tot_mgr_get_data_err.incrementAndGet();
1644           getDataSetWatchFailure(path);
1645         } else {
1646           SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet();
1647           getDataSetWatch(path, retry_count - 1);
1648         }
1649         return;
1650       }
1651       try {
1652         getDataSetWatchSuccess(path, data, stat.getVersion());
1653       } catch (DeserializationException e) {
1654         LOG.warn("Deserialization problem", e);
1655       }
1656       return;
1657     }
1658   }
1659 
1660   /**
1661    * Asynchronous handler for zk delete node results.
1662    * Retries on failures.
1663    */
1664   class DeleteAsyncCallback implements AsyncCallback.VoidCallback {
1665     private final Log LOG = LogFactory.getLog(DeleteAsyncCallback.class);
1666 
1667     @Override
1668     public void processResult(int rc, String path, Object ctx) {
1669       SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet();
1670       if (rc != 0) {
1671         if (needAbandonRetries(rc, "Delete znode " + path)) {
1672           failedDeletions.add(path);
1673           return;
1674         }
1675         if (rc != KeeperException.Code.NONODE.intValue()) {
1676           SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet();
1677           Long retry_count = (Long) ctx;
1678           LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " +
1679               path + " remaining retries=" + retry_count);
1680           if (retry_count == 0) {
1681             LOG.warn("delete failed " + path);
1682             failedDeletions.add(path);
1683             deleteNodeFailure(path);
1684           } else {
1685             deleteNode(path, retry_count - 1);
1686           }
1687           return;
1688         } else {
1689           LOG.info(path +
1690             " does not exist. Either was created but deleted behind our" +
1691             " back by another pending delete OR was deleted" +
1692             " in earlier retry rounds. zkretries = " + (Long) ctx);
1693         }
1694       } else {
1695         LOG.debug("deleted " + path);
1696       }
1697       deleteNodeSuccess(path);
1698     }
1699   }
1700 
1701   /**
1702    * Asynchronous handler for zk create RESCAN-node results.
1703    * Retries on failures.
1704    * <p>
1705    * A RESCAN node is created using PERSISTENT_SEQUENTIAL flag. It is a signal
1706    * for all the {@link SplitLogWorker}s to rescan for new tasks.
1707    */
1708   class CreateRescanAsyncCallback implements AsyncCallback.StringCallback {
1709     private final Log LOG = LogFactory.getLog(CreateRescanAsyncCallback.class);
1710 
1711     @Override
1712     public void processResult(int rc, String path, Object ctx, String name) {
1713       if (rc != 0) {
1714         if (needAbandonRetries(rc, "CreateRescan znode " + path)) {
1715           return;
1716         }
1717         Long retry_count = (Long)ctx;
1718         LOG.warn("rc=" + KeeperException.Code.get(rc) + " for "+ path +
1719             " remaining retries=" + retry_count);
1720         if (retry_count == 0) {
1721           createRescanFailure();
1722         } else {
1723           createRescanNode(retry_count - 1);
1724         }
1725         return;
1726       }
1727       // path is the original arg, name is the actual name that was created
1728       createRescanSuccess(name);
1729     }
1730   }
1731 
1732   /**
1733    * {@link SplitLogManager} can use objects implementing this interface to
1734    * finish off a partially done task by {@link SplitLogWorker}. This provides
1735    * a serialization point at the end of the task processing. Must be
1736    * restartable and idempotent.
1737    */
1738   public interface TaskFinisher {
1739     /**
1740      * status that can be returned finish()
1741      */
1742     enum Status {
1743       /**
1744        * task completed successfully
1745        */
1746       DONE(),
1747       /**
1748        * task completed with error
1749        */
1750       ERR();
1751     }
1752     /**
1753      * finish the partially done task. workername provides clue to where the
1754      * partial results of the partially done tasks are present. taskname is the
1755      * name of the task that was put up in zookeeper.
1756      * <p>
1757      * @param workerName
1758      * @param taskname
1759      * @return DONE if task completed successfully, ERR otherwise
1760      */
1761     Status finish(ServerName workerName, String taskname);
1762   }
1763 
1764   enum ResubmitDirective {
1765     CHECK(),
1766     FORCE();
1767   }
1768 
1769   enum TerminationStatus {
1770     IN_PROGRESS("in_progress"),
1771     SUCCESS("success"),
1772     FAILURE("failure"),
1773     DELETED("deleted");
1774 
1775     String statusMsg;
1776     TerminationStatus(String msg) {
1777       statusMsg = msg;
1778     }
1779 
1780     @Override
1781     public String toString() {
1782       return statusMsg;
1783     }
1784   }
1785 }