1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.ConnectException;
24  import java.net.SocketTimeoutException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicLong;
29  
30  import org.apache.commons.logging.Log;
31  import org.apache.commons.logging.LogFactory;
32  import org.apache.hadoop.classification.InterfaceAudience;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.ServerName;
37  import org.apache.hadoop.hbase.SplitLogCounters;
38  import org.apache.hadoop.hbase.SplitLogTask;
39  import org.apache.hadoop.hbase.client.HConnectionManager;
40  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
41  import org.apache.hadoop.hbase.exceptions.DeserializationException;
42  import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
43  import org.apache.hadoop.hbase.master.SplitLogManager;
44  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
45  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
46  import org.apache.hadoop.hbase.util.CancelableProgressable;
47  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
48  import org.apache.hadoop.hbase.util.FSUtils;
49  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
50  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
51  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
52  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
53  import org.apache.hadoop.util.StringUtils;
54  import org.apache.zookeeper.AsyncCallback;
55  import org.apache.zookeeper.KeeperException;
56  import org.apache.zookeeper.data.Stat;
57  
58  /**
59   * This worker is spawned in every regionserver (should we also spawn one in
60   * the master?). The Worker waits for log splitting tasks to be put up by the
61   * {@link SplitLogManager} running in the master and races with other workers
62   * in other serves to acquire those tasks. The coordination is done via
63   * zookeeper. All the action takes place at /hbase/splitlog znode.
64   * <p>
65   * If a worker has successfully moved the task from state UNASSIGNED to
66   * OWNED then it owns the task. It keeps heart beating the manager by
67   * periodically moving the task from UNASSIGNED to OWNED state. On success it
68   * moves the task to TASK_DONE. On unrecoverable error it moves task state to
69   * ERR. If it cannot continue but wants the master to retry the task then it
70   * moves the task state to RESIGNED.
71   * <p>
72   * The manager can take a task away from a worker by moving the task from
73   * OWNED to UNASSIGNED. In the absence of a global lock there is a
74   * unavoidable race here - a worker might have just finished its task when it
75   * is stripped of its ownership. Here we rely on the idempotency of the log
76   * splitting task for correctness
77   */
78  @InterfaceAudience.Private
79  public class SplitLogWorker extends ZooKeeperListener implements Runnable {
80    private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
81    private static final int checkInterval = 5000; // 5 seconds
82  
83    Thread worker;
84    private final ServerName serverName;
85    private final TaskExecutor splitTaskExecutor;
86  
87    private final Object taskReadyLock = new Object();
88    volatile int taskReadySeq = 0;
89    private volatile String currentTask = null;
90    private int currentVersion;
91    private volatile boolean exitWorker;
92    private final Object grabTaskLock = new Object();
93    private boolean workerInGrabTask = false;
94    private final int report_period;
95    private RegionServerServices server = null;
96    private Configuration conf = null;
97  
98    public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf,
99        RegionServerServices server, TaskExecutor splitTaskExecutor) {
100     super(watcher);
101     this.server = server;
102     this.serverName = server.getServerName();
103     this.splitTaskExecutor = splitTaskExecutor;
104     report_period = conf.getInt("hbase.splitlog.report.period",
105       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
106     this.conf = conf;
107   }
108 
109   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, ServerName serverName,
110       TaskExecutor splitTaskExecutor) {
111     super(watcher);
112     this.serverName = serverName;
113     this.splitTaskExecutor = splitTaskExecutor;
114     report_period = conf.getInt("hbase.splitlog.report.period",
115       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
116     this.conf = conf;
117   }
118 
119   public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
120       RegionServerServices server, final LastSequenceId sequenceIdChecker) {
121     this(watcher, conf, server, new TaskExecutor() {
122       @Override
123       public Status exec(String filename, CancelableProgressable p) {
124         Path rootdir;
125         FileSystem fs;
126         try {
127           rootdir = FSUtils.getRootDir(conf);
128           fs = rootdir.getFileSystem(conf);
129         } catch (IOException e) {
130           LOG.warn("could not find root dir or fs", e);
131           return Status.RESIGNED;
132         }
133         // TODO have to correctly figure out when log splitting has been
134         // interrupted or has encountered a transient error and when it has
135         // encountered a bad non-retry-able persistent error.
136         try {
137           if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
138             fs, conf, p, sequenceIdChecker, watcher)) {
139             return Status.PREEMPTED;
140           }
141         } catch (InterruptedIOException iioe) {
142           LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
143           return Status.RESIGNED;
144         } catch (IOException e) {
145           Throwable cause = e.getCause();
146           if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException 
147                   || cause instanceof ConnectException 
148                   || cause instanceof SocketTimeoutException)) {
149             LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
150             		+ "resigning", e);
151             return Status.RESIGNED;
152           } else if (cause instanceof InterruptedException) {
153             LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
154             return Status.RESIGNED;
155           } else if(cause instanceof KeeperException) {
156             LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
157             return Status.RESIGNED;
158           }
159           LOG.warn("log splitting of " + filename + " failed, returning error", e);
160           return Status.ERR;
161         }
162         return Status.DONE;
163       }
164     });
165   }
166 
167   @Override
168   public void run() {
169     try {
170       LOG.info("SplitLogWorker " + this.serverName + " starting");
171       this.watcher.registerListener(this);
172       // initialize a new connection for splitlogworker configuration
173       HConnectionManager.getConnection(conf);
174       int res;
175       // wait for master to create the splitLogZnode
176       res = -1;
177       while (res == -1 && !exitWorker) {
178         try {
179           res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
180         } catch (KeeperException e) {
181           // ignore
182           LOG.warn("Exception when checking for " + watcher.splitLogZNode  + " ... retrying", e);
183         }
184         if (res == -1) {
185           try {
186             LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
187             Thread.sleep(1000);
188           } catch (InterruptedException e) {
189             LOG.debug("Interrupted while waiting for " + watcher.splitLogZNode
190                 + (exitWorker ? "" : " (ERROR: exitWorker is not set, " +
191                 "exiting anyway)"));
192             exitWorker = true;
193             break;
194           }
195         }
196       }
197 
198       if (!exitWorker) {
199         taskLoop();
200       }
201     } catch (Throwable t) {
202       // only a logical error can cause here. Printing it out
203       // to make debugging easier
204       LOG.error("unexpected error ", t);
205     } finally {
206       LOG.info("SplitLogWorker " + this.serverName + " exiting");
207     }
208   }
209 
210   /**
211    * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
212    * one at a time. This policy puts an upper-limit on the number of
213    * simultaneous log splitting that could be happening in a cluster.
214    * <p>
215    * Synchronization using {@link #taskReadyLock} ensures that it will
216    * try to grab every task that has been put up
217    */
218   private void taskLoop() {
219     while (!exitWorker) {
220       int seq_start = taskReadySeq;
221       List<String> paths = getTaskList();
222       if (paths == null) {
223         LOG.warn("Could not get tasks, did someone remove " +
224             this.watcher.splitLogZNode + " ... worker thread exiting.");
225         return;
226       }
227       // pick meta wal firstly
228       int offset = (int) (Math.random() * paths.size());
229       for(int i = 0; i < paths.size(); i ++){
230         if(HLogUtil.isMetaFile(paths.get(i))) {
231           offset = i;
232           break;
233         }
234       }
235       for (int i = 0; i < paths.size(); i ++) {
236         int idx = (i + offset) % paths.size();
237         // don't call ZKSplitLog.getNodeName() because that will lead to
238         // double encoding of the path name
239         grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
240         if (exitWorker) {
241           return;
242         }
243       }
244       synchronized (taskReadyLock) {
245         while (seq_start == taskReadySeq) {
246           try {
247             taskReadyLock.wait(checkInterval);
248             if (this.server != null) {
249               // check to see if we have stale recovering regions in our internal memory state
250               Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
251               if (!recoveringRegions.isEmpty()) {
252                 // Make a local copy to prevent ConcurrentModificationException when other threads
253                 // modify recoveringRegions
254                 List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
255                 for (String region : tmpCopy) {
256                   String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
257                   try {
258                     if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
259                       HRegion r = recoveringRegions.remove(region);
260                       if (r != null) {
261                         r.setRecovering(false);
262                       }
263                       LOG.debug("Mark recovering region:" + region + " up.");
264                     } else {
265                       // current check is a defensive(or redundant) mechanism to prevent us from
266                       // having stale recovering regions in our internal RS memory state while
267                       // zookeeper(source of truth) says differently. We stop at the first good one
268                       // because we should not have a single instance such as this in normal case so
269                       // check the first one is good enough.
270                       break;
271                     }
272                   } catch (KeeperException e) {
273                     // ignore zookeeper error
274                     LOG.debug("Got a zookeeper when trying to open a recovering region", e);
275                     break;
276                   }
277                 }
278               }
279             }
280           } catch (InterruptedException e) {
281             LOG.info("SplitLogWorker interrupted while waiting for task," +
282                 " exiting: " + e.toString() + (exitWorker ? "" :
283                 " (ERROR: exitWorker is not set, exiting anyway)"));
284             exitWorker = true;
285             return;
286           }
287         }
288       }
289 
290     }
291   }
292 
293   /**
294    * try to grab a 'lock' on the task zk node to own and execute the task.
295    * <p>
296    * @param path zk node for the task
297    */
298   private void grabTask(String path) {
299     Stat stat = new Stat();
300     long t = -1;
301     byte[] data;
302     synchronized (grabTaskLock) {
303       currentTask = path;
304       workerInGrabTask = true;
305       if (Thread.interrupted()) {
306         return;
307       }
308     }
309     try {
310       try {
311         if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
312           SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
313           return;
314         }
315       } catch (KeeperException e) {
316         LOG.warn("Failed to get data for znode " + path, e);
317         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
318         return;
319       }
320       SplitLogTask slt;
321       try {
322         slt = SplitLogTask.parseFrom(data);
323       } catch (DeserializationException e) {
324         LOG.warn("Failed parse data for znode " + path, e);
325         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
326         return;
327       }
328       if (!slt.isUnassigned()) {
329         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
330         return;
331       }
332 
333       currentVersion = stat.getVersion();
334       if (!attemptToOwnTask(true)) {
335         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
336         return;
337       }
338 
339       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
340         endTask(new SplitLogTask.Done(this.serverName),
341           SplitLogCounters.tot_wkr_task_acquired_rescan);
342         return;
343       }
344       LOG.info("worker " + serverName + " acquired task " + path);
345       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
346       getDataSetWatchAsync();
347 
348       t = System.currentTimeMillis();
349       TaskExecutor.Status status;
350 
351       status = splitTaskExecutor.exec(ZKSplitLog.getFileName(currentTask),
352           new CancelableProgressable() {
353 
354         private long last_report_at = 0;
355 
356         @Override
357         public boolean progress() {
358           long t = EnvironmentEdgeManager.currentTimeMillis();
359           if ((t - last_report_at) > report_period) {
360             last_report_at = t;
361             if (!attemptToOwnTask(false)) {
362               LOG.warn("Failed to heartbeat the task" + currentTask);
363               return false;
364             }
365           }
366           return true;
367         }
368       });
369 
370       switch (status) {
371         case DONE:
372           endTask(new SplitLogTask.Done(this.serverName), SplitLogCounters.tot_wkr_task_done);
373           break;
374         case PREEMPTED:
375           SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
376           LOG.warn("task execution prempted " + path);
377           break;
378         case ERR:
379           if (!exitWorker) {
380             endTask(new SplitLogTask.Err(this.serverName), SplitLogCounters.tot_wkr_task_err);
381             break;
382           }
383           // if the RS is exiting then there is probably a tons of stuff
384           // that can go wrong. Resign instead of signaling error.
385           //$FALL-THROUGH$
386         case RESIGNED:
387           if (exitWorker) {
388             LOG.info("task execution interrupted because worker is exiting " + path);
389             endTask(new SplitLogTask.Resigned(this.serverName),
390               SplitLogCounters.tot_wkr_task_resigned);
391           } else {
392             SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
393             LOG.info("task execution interrupted via zk by manager " + path);
394           }
395           break;
396       }
397     } finally {
398       if (t > 0) {
399         LOG.info("worker " + serverName + " done with task " + path +
400             " in " + (System.currentTimeMillis() - t) + "ms");
401       }
402       synchronized (grabTaskLock) {
403         workerInGrabTask = false;
404         // clear the interrupt from stopTask() otherwise the next task will
405         // suffer
406         Thread.interrupted();
407       }
408     }
409   }
410 
411   /**
412    * Try to own the task by transitioning the zk node data from UNASSIGNED to
413    * OWNED.
414    * <p>
415    * This method is also used to periodically heartbeat the task progress by
416    * transitioning the node from OWNED to OWNED.
417    * <p>
418    * @return true if task path is successfully locked
419    */
420   private boolean attemptToOwnTask(boolean isFirstTime) {
421     try {
422       SplitLogTask slt = new SplitLogTask.Owned(this.serverName);
423       Stat stat =
424         this.watcher.getRecoverableZooKeeper().setData(currentTask, slt.toByteArray(), currentVersion);
425       if (stat == null) {
426         LOG.warn("zk.setData() returned null for path " + currentTask);
427         SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
428         return (false);
429       }
430       currentVersion = stat.getVersion();
431       SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
432       return (true);
433     } catch (KeeperException e) {
434       if (!isFirstTime) {
435         if (e.code().equals(KeeperException.Code.NONODE)) {
436           LOG.warn("NONODE failed to assert ownership for " + currentTask, e);
437         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
438           LOG.warn("BADVERSION failed to assert ownership for " +
439               currentTask, e);
440         } else {
441           LOG.warn("failed to assert ownership for " + currentTask, e);
442         }
443       }
444     } catch (InterruptedException e1) {
445       LOG.warn("Interrupted while trying to assert ownership of " +
446           currentTask + " " + StringUtils.stringifyException(e1));
447       Thread.currentThread().interrupt();
448     }
449     SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
450     return (false);
451   }
452 
453   /**
454    * endTask() can fail and the only way to recover out of it is for the
455    * {@link SplitLogManager} to timeout the task node.
456    * @param slt
457    * @param ctr
458    */
459   private void endTask(SplitLogTask slt, AtomicLong ctr) {
460     String path = currentTask;
461     currentTask = null;
462     try {
463       if (ZKUtil.setData(this.watcher, path, slt.toByteArray(),
464           currentVersion)) {
465         LOG.info("successfully transitioned task " + path + " to final state " + slt);
466         ctr.incrementAndGet();
467         return;
468       }
469       LOG.warn("failed to transistion task " + path + " to end state " + slt +
470           " because of version mismatch ");
471     } catch (KeeperException.BadVersionException bve) {
472       LOG.warn("transisition task " + path + " to " + slt +
473           " failed because of version mismatch", bve);
474     } catch (KeeperException.NoNodeException e) {
475       LOG.fatal("logic error - end task " + path + " " + slt +
476           " failed because task doesn't exist", e);
477     } catch (KeeperException e) {
478       LOG.warn("failed to end task, " + path + " " + slt, e);
479     }
480     SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
481   }
482 
483   void getDataSetWatchAsync() {
484     this.watcher.getRecoverableZooKeeper().getZooKeeper().
485       getData(currentTask, this.watcher,
486       new GetDataAsyncCallback(), null);
487     SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
488   }
489 
490   void getDataSetWatchSuccess(String path, byte[] data) {
491     SplitLogTask slt;
492     try {
493       slt = SplitLogTask.parseFrom(data);
494     } catch (DeserializationException e) {
495       LOG.warn("Failed parse", e);
496       return;
497     }
498     synchronized (grabTaskLock) {
499       if (workerInGrabTask) {
500         // currentTask can change but that's ok
501         String taskpath = currentTask;
502         if (taskpath != null && taskpath.equals(path)) {
503           // have to compare data. cannot compare version because then there
504           // will be race with attemptToOwnTask()
505           // cannot just check whether the node has been transitioned to
506           // UNASSIGNED because by the time this worker sets the data watch
507           // the node might have made two transitions - from owned by this
508           // worker to unassigned to owned by another worker
509           if (! slt.isOwned(this.serverName) &&
510               ! slt.isDone(this.serverName) &&
511               ! slt.isErr(this.serverName) &&
512               ! slt.isResigned(this.serverName)) {
513             LOG.info("task " + taskpath + " preempted from " +
514                 serverName + ", current task state and owner=" + slt.toString());
515             stopTask();
516           }
517         }
518       }
519     }
520   }
521 
522   void getDataSetWatchFailure(String path) {
523     synchronized (grabTaskLock) {
524       if (workerInGrabTask) {
525         // currentTask can change but that's ok
526         String taskpath = currentTask;
527         if (taskpath != null && taskpath.equals(path)) {
528           LOG.info("retrying data watch on " + path);
529           SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
530           getDataSetWatchAsync();
531         } else {
532           // no point setting a watch on the task which this worker is not
533           // working upon anymore
534         }
535       }
536     }
537   }
538 
539   @Override
540   public void nodeDataChanged(String path) {
541     // there will be a self generated dataChanged event every time attemptToOwnTask()
542     // heartbeats the task znode by upping its version
543     synchronized (grabTaskLock) {
544       if (workerInGrabTask) {
545         // currentTask can change
546         String taskpath = currentTask;
547         if (taskpath!= null && taskpath.equals(path)) {
548           getDataSetWatchAsync();
549         }
550       }
551     }
552   }
553 
554 
555   private List<String> getTaskList() {
556     List<String> childrenPaths = null;
557     long sleepTime = 1000;
558     // It will be in loop till it gets the list of children or
559     // it will come out if worker thread exited.
560     while (!exitWorker) {
561       try {
562         childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
563             this.watcher.splitLogZNode);
564         if (childrenPaths != null) {
565           return childrenPaths;
566         }
567       } catch (KeeperException e) {
568         LOG.warn("Could not get children of znode "
569             + this.watcher.splitLogZNode, e);
570       }
571       try {
572         LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
573             + " after sleep for " + sleepTime + "ms!");
574         Thread.sleep(sleepTime);
575       } catch (InterruptedException e1) {
576         LOG.warn("Interrupted while trying to get task list ...", e1);
577         Thread.currentThread().interrupt();
578       }
579     }
580     return childrenPaths;
581   }
582 
583   @Override
584   public void nodeChildrenChanged(String path) {
585     if(path.equals(watcher.splitLogZNode)) {
586       LOG.debug("tasks arrived or departed");
587       synchronized (taskReadyLock) {
588         taskReadySeq++;
589         taskReadyLock.notify();
590       }
591     }
592   }
593 
594   /**
595    * If the worker is doing a task i.e. splitting a log file then stop the task.
596    * It doesn't exit the worker thread.
597    */
598   void stopTask() {
599     LOG.info("Sending interrupt to stop the worker thread");
600     worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
601   }
602 
603 
604   /**
605    * start the SplitLogWorker thread
606    */
607   public void start() {
608     worker = new Thread(null, this, "SplitLogWorker-" + serverName);
609     exitWorker = false;
610     worker.start();
611   }
612 
613   /**
614    * stop the SplitLogWorker thread
615    */
616   public void stop() {
617     exitWorker = true;
618     stopTask();
619   }
620 
621   /**
622    * Asynchronous handler for zk get-data-set-watch on node results.
623    */
624   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
625     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
626 
627     @Override
628     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
629       SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
630       if (rc != 0) {
631         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
632         getDataSetWatchFailure(path);
633         return;
634       }
635       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
636       getDataSetWatchSuccess(path, data);
637     }
638   }
639 
640   /**
641    * Objects implementing this interface actually do the task that has been
642    * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
643    * guarantee that two workers will not be executing the same task therefore it
644    * is better to have workers prepare the task and then have the
645    * {@link SplitLogManager} commit the work in SplitLogManager.TaskFinisher
646    */
647   static public interface TaskExecutor {
648     static public enum Status {
649       DONE(),
650       ERR(),
651       RESIGNED(),
652       PREEMPTED()
653     }
654     public Status exec(String name, CancelableProgressable p);
655   }
656 }