View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.net.ConnectException;
24  import java.net.SocketTimeoutException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicInteger;
29  
30  import org.apache.commons.lang.math.RandomUtils;
31  import org.apache.commons.lang.mutable.MutableInt;
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.conf.Configuration;
36  import org.apache.hadoop.fs.FileSystem;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.NotServingRegionException;
40  import org.apache.hadoop.hbase.ServerName;
41  import org.apache.hadoop.hbase.SplitLogCounters;
42  import org.apache.hadoop.hbase.SplitLogTask;
43  import org.apache.hadoop.hbase.client.HConnectionManager;
44  import org.apache.hadoop.hbase.client.RetriesExhaustedException;
45  import org.apache.hadoop.hbase.exceptions.DeserializationException;
46  import org.apache.hadoop.hbase.executor.ExecutorService;
47  import org.apache.hadoop.hbase.master.SplitLogManager;
48  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
49  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
50  import org.apache.hadoop.hbase.regionserver.handler.HLogSplitterHandler;
51  import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
52  import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
53  import org.apache.hadoop.hbase.util.CancelableProgressable;
54  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
55  import org.apache.hadoop.hbase.util.ExceptionUtil;
56  import org.apache.hadoop.hbase.util.FSUtils;
57  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
58  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
59  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
60  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
61  import org.apache.hadoop.util.StringUtils;
62  import org.apache.zookeeper.AsyncCallback;
63  import org.apache.zookeeper.KeeperException;
64  import org.apache.zookeeper.data.Stat;
65  
66  /**
67   * This worker is spawned in every regionserver (should we also spawn one in
68   * the master?). The Worker waits for log splitting tasks to be put up by the
69   * {@link SplitLogManager} running in the master and races with other workers
70   * in other serves to acquire those tasks. The coordination is done via
71   * zookeeper. All the action takes place at /hbase/splitlog znode.
72   * <p>
73   * If a worker has successfully moved the task from state UNASSIGNED to
74   * OWNED then it owns the task. It keeps heart beating the manager by
75   * periodically moving the task from UNASSIGNED to OWNED state. On success it
76   * moves the task to TASK_DONE. On unrecoverable error it moves task state to
77   * ERR. If it cannot continue but wants the master to retry the task then it
78   * moves the task state to RESIGNED.
79   * <p>
80   * The manager can take a task away from a worker by moving the task from
81   * OWNED to UNASSIGNED. In the absence of a global lock there is a
82   * unavoidable race here - a worker might have just finished its task when it
83   * is stripped of its ownership. Here we rely on the idempotency of the log
84   * splitting task for correctness
85   */
86  @InterfaceAudience.Private
87  public class SplitLogWorker extends ZooKeeperListener implements Runnable {
88    public static final int DEFAULT_MAX_SPLITTERS = 2;
89  
90    private static final Log LOG = LogFactory.getLog(SplitLogWorker.class);
91    private static final int checkInterval = 5000; // 5 seconds
92    private static final int FAILED_TO_OWN_TASK = -1;
93  
94    Thread worker;
95    private final ServerName serverName;
96    private final TaskExecutor splitTaskExecutor;
97    // thread pool which executes recovery work
98    private final ExecutorService executorService;
99  
100   private final Object taskReadyLock = new Object();
101   volatile int taskReadySeq = 0;
102   private volatile String currentTask = null;
103   private int currentVersion;
104   private volatile boolean exitWorker;
105   private final Object grabTaskLock = new Object();
106   private boolean workerInGrabTask = false;
107   private final int report_period;
108   private RegionServerServices server = null;
109   private Configuration conf = null;
110   protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
111   private int maxConcurrentTasks = 0;
112 
113   public SplitLogWorker(ZooKeeperWatcher watcher, Configuration conf, RegionServerServices server,
114       TaskExecutor splitTaskExecutor) {
115     super(watcher);
116     this.server = server;
117     this.serverName = server.getServerName();
118     this.splitTaskExecutor = splitTaskExecutor;
119     report_period = conf.getInt("hbase.splitlog.report.period",
120       conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 3);
121     this.conf = conf;
122     this.executorService = this.server.getExecutorService();
123     this.maxConcurrentTasks =
124         conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
125   }
126 
127   public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
128       final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
129     this(watcher, conf, server, new TaskExecutor() {
130       @Override
131       public Status exec(String filename, RecoveryMode mode, CancelableProgressable p) {
132         Path rootdir;
133         FileSystem fs;
134         try {
135           rootdir = FSUtils.getRootDir(conf);
136           fs = rootdir.getFileSystem(conf);
137         } catch (IOException e) {
138           LOG.warn("could not find root dir or fs", e);
139           return Status.RESIGNED;
140         }
141         // TODO have to correctly figure out when log splitting has been
142         // interrupted or has encountered a transient error and when it has
143         // encountered a bad non-retry-able persistent error.
144         try {
145           if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
146             fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager(), mode)) {
147             return Status.PREEMPTED;
148           }
149         } catch (InterruptedIOException iioe) {
150           LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
151           return Status.RESIGNED;
152         } catch (IOException e) {
153           Throwable cause = e.getCause();
154           if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException 
155                   || cause instanceof ConnectException 
156                   || cause instanceof SocketTimeoutException)) {
157             LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
158             		+ "resigning", e);
159             return Status.RESIGNED;
160           } else if (cause instanceof InterruptedException) {
161             LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
162             return Status.RESIGNED;
163           } else if(cause instanceof KeeperException) {
164             LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
165             return Status.RESIGNED;
166           }
167           LOG.warn("log splitting of " + filename + " failed, returning error", e);
168           return Status.ERR;
169         }
170         return Status.DONE;
171       }
172     });
173   }
174 
175   @Override
176   public void run() {
177     try {
178       LOG.info("SplitLogWorker " + this.serverName + " starting");
179       this.watcher.registerListener(this);
180       // pre-initialize a new connection for splitlogworker configuration
181       HConnectionManager.getConnection(conf);
182 
183       // wait for master to create the splitLogZnode
184       int res = -1;
185       while (res == -1 && !exitWorker) {
186         try {
187           res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
188         } catch (KeeperException e) {
189           // ignore
190           LOG.warn("Exception when checking for " + watcher.splitLogZNode  + " ... retrying", e);
191         }
192         if (res == -1) {
193           LOG.info(watcher.splitLogZNode + " znode does not exist, waiting for master to create");
194           Thread.sleep(1000);
195         }
196       }
197 
198       if (!exitWorker) {
199           taskLoop();
200       }
201     } catch (Throwable t) {
202       if (ExceptionUtil.isInterrupt(t)) {
203         LOG.info("SplitLogWorker interrupted. Exiting. " + (exitWorker ? "" :
204             " (ERROR: exitWorker is not set, exiting anyway)"));
205       } else {
206         // only a logical error can cause here. Printing it out
207         // to make debugging easier
208         LOG.error("unexpected error ", t);
209       }
210     } finally {
211       LOG.info("SplitLogWorker " + this.serverName + " exiting");
212     }
213   }
214 
215   /**
216    * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task
217    * one at a time. This policy puts an upper-limit on the number of
218    * simultaneous log splitting that could be happening in a cluster.
219    * <p>
220    * Synchronization using {@link #taskReadyLock} ensures that it will
221    * try to grab every task that has been put up
222    */
223   private void taskLoop() throws InterruptedException {
224     while (!exitWorker) {
225       int seq_start = taskReadySeq;
226       List<String> paths = getTaskList();
227       if (paths == null) {
228         LOG.warn("Could not get tasks, did someone remove " +
229             this.watcher.splitLogZNode + " ... worker thread exiting.");
230         return;
231       }
232       // pick meta wal firstly
233       int offset = (int) (Math.random() * paths.size());
234       for(int i = 0; i < paths.size(); i ++){
235         if(HLogUtil.isMetaFile(paths.get(i))) {
236           offset = i;
237           break;
238         }
239       }
240       int numTasks = paths.size();
241       for (int i = 0; i < numTasks; i++) {
242         int idx = (i + offset) % paths.size();
243         // don't call ZKSplitLog.getNodeName() because that will lead to
244         // double encoding of the path name
245         if (this.calculateAvailableSplitters(numTasks) > 0) {
246           grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
247         } else {
248           LOG.debug("Current region server " + this.serverName + " has "
249               + this.tasksInProgress.get() + " tasks in progress and can't take more.");
250           break;
251         }
252         if (exitWorker) {
253           return;
254         }
255       }
256       SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
257       synchronized (taskReadyLock) {
258         while (seq_start == taskReadySeq) {
259           taskReadyLock.wait(checkInterval);
260           if (this.server != null) {
261             // check to see if we have stale recovering regions in our internal memory state
262             Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
263             if (!recoveringRegions.isEmpty()) {
264               // Make a local copy to prevent ConcurrentModificationException when other threads
265               // modify recoveringRegions
266               List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
267               for (String region : tmpCopy) {
268                 String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
269                 try {
270                   if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
271                     HRegion r = recoveringRegions.remove(region);
272                     if (r != null) {
273                       r.setRecovering(false);
274                     }
275                     LOG.debug("Mark recovering region:" + region + " up.");
276                   } else {
277                     // current check is a defensive(or redundant) mechanism to prevent us from
278                     // having stale recovering regions in our internal RS memory state while
279                     // zookeeper(source of truth) says differently. We stop at the first good one
280                     // because we should not have a single instance such as this in normal case so
281                     // check the first one is good enough.
282                     break;
283                   }
284                 } catch (KeeperException e) {
285                   // ignore zookeeper error
286                   LOG.debug("Got a zookeeper when trying to open a recovering region", e);
287                   break;
288                 }
289               }
290             }
291           }
292         }
293       }
294     }
295   }
296 
297   /**
298    * try to grab a 'lock' on the task zk node to own and execute the task.
299    * <p>
300    * @param path zk node for the task
301    */
302   private void grabTask(String path) {
303     Stat stat = new Stat();
304     byte[] data;
305     synchronized (grabTaskLock) {
306       currentTask = path;
307       workerInGrabTask = true;
308       if (Thread.interrupted()) {
309         return;
310       }
311     }
312     try {
313       try {
314         if ((data = ZKUtil.getDataNoWatch(this.watcher, path, stat)) == null) {
315           SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
316           return;
317         }
318       } catch (KeeperException e) {
319         LOG.warn("Failed to get data for znode " + path, e);
320         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
321         return;
322       }
323       SplitLogTask slt;
324       try {
325         slt = SplitLogTask.parseFrom(data);
326       } catch (DeserializationException e) {
327         LOG.warn("Failed parse data for znode " + path, e);
328         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
329         return;
330       }
331       if (!slt.isUnassigned()) {
332         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
333         return;
334       }
335 
336       currentVersion = attemptToOwnTask(true, watcher, serverName, path, slt.getMode(),
337         stat.getVersion());
338       if (currentVersion < 0) {
339         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
340         return;
341       }
342 
343       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
344         HLogSplitterHandler.endTask(watcher, new SplitLogTask.Done(this.serverName, slt.getMode()),
345           SplitLogCounters.tot_wkr_task_acquired_rescan, currentTask, currentVersion);
346         return;
347       }
348 
349       LOG.info("worker " + serverName + " acquired task " + path);
350       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
351       getDataSetWatchAsync();
352 
353       submitTask(path, slt.getMode(), currentVersion, this.report_period);
354 
355       // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
356       try {
357         int sleepTime = RandomUtils.nextInt(500) + 500;
358         Thread.sleep(sleepTime);
359       } catch (InterruptedException e) {
360         LOG.warn("Interrupted while yielding for other region servers", e);
361         Thread.currentThread().interrupt();
362       }
363     } finally {
364       synchronized (grabTaskLock) {
365         workerInGrabTask = false;
366         // clear the interrupt from stopTask() otherwise the next task will
367         // suffer
368         Thread.interrupted();
369       }
370     }
371   }
372 
373 
374   /**
375    * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
376    * <p>
377    * This method is also used to periodically heartbeat the task progress by transitioning the node
378    * from OWNED to OWNED.
379    * <p>
380    * @param isFirstTime
381    * @param zkw
382    * @param server
383    * @param task
384    * @param taskZKVersion
385    * @return non-negative integer value when task can be owned by current region server otherwise -1
386    */
387   protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
388       ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
389     int latestZKVersion = FAILED_TO_OWN_TASK;
390     try {
391       SplitLogTask slt = new SplitLogTask.Owned(server, mode);
392       Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
393       if (stat == null) {
394         LOG.warn("zk.setData() returned null for path " + task);
395         SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
396         return FAILED_TO_OWN_TASK;
397       }
398       latestZKVersion = stat.getVersion();
399       SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
400       return latestZKVersion;
401     } catch (KeeperException e) {
402       if (!isFirstTime) {
403         if (e.code().equals(KeeperException.Code.NONODE)) {
404           LOG.warn("NONODE failed to assert ownership for " + task, e);
405         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
406           LOG.warn("BADVERSION failed to assert ownership for " + task, e);
407         } else {
408           LOG.warn("failed to assert ownership for " + task, e);
409         }
410       }
411     } catch (InterruptedException e1) {
412       LOG.warn("Interrupted while trying to assert ownership of " +
413           task + " " + StringUtils.stringifyException(e1));
414       Thread.currentThread().interrupt();
415     }
416     SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
417     return FAILED_TO_OWN_TASK;
418   }
419 
420   /**
421    * This function calculates how many splitters it could create based on expected average tasks per
422    * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
423    * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
424    * @param numTasks current total number of available tasks
425    */
426   private int calculateAvailableSplitters(int numTasks) {
427     // at lease one RS(itself) available
428     int availableRSs = 1;
429     try {
430       List<String> regionServers = ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
431       availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
432     } catch (KeeperException e) {
433       // do nothing
434       LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
435     }
436 
437     int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
438     expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
439     // calculate how many more splitters we could spawn
440     return Math.min(expectedTasksPerRS, this.maxConcurrentTasks) - this.tasksInProgress.get();
441   }
442 
443   /**
444    * Submit a log split task to executor service
445    * @param curTask
446    * @param curTaskZKVersion
447    */
448   void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
449       final int reportPeriod) {
450     final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
451 
452     CancelableProgressable reporter = new CancelableProgressable() {
453       private long last_report_at = 0;
454 
455       @Override
456       public boolean progress() {
457         long t = EnvironmentEdgeManager.currentTimeMillis();
458         if ((t - last_report_at) > reportPeriod) {
459           last_report_at = t;
460           int latestZKVersion = attemptToOwnTask(false, watcher, serverName, curTask, mode,
461             zkVersion.intValue());
462           if (latestZKVersion < 0) {
463             LOG.warn("Failed to heartbeat the task" + curTask);
464             return false;
465           }
466           zkVersion.setValue(latestZKVersion);
467         }
468         return true;
469       }
470     };
471     
472     HLogSplitterHandler hsh = new HLogSplitterHandler(this.server, curTask, zkVersion, reporter, 
473       this.tasksInProgress, this.splitTaskExecutor, mode);
474     this.executorService.submit(hsh);
475   }
476 
477   void getDataSetWatchAsync() {
478     this.watcher.getRecoverableZooKeeper().getZooKeeper().
479       getData(currentTask, this.watcher,
480       new GetDataAsyncCallback(), null);
481     SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
482   }
483 
484   void getDataSetWatchSuccess(String path, byte[] data) {
485     SplitLogTask slt;
486     try {
487       slt = SplitLogTask.parseFrom(data);
488     } catch (DeserializationException e) {
489       LOG.warn("Failed parse", e);
490       return;
491     }
492     synchronized (grabTaskLock) {
493       if (workerInGrabTask) {
494         // currentTask can change but that's ok
495         String taskpath = currentTask;
496         if (taskpath != null && taskpath.equals(path)) {
497           // have to compare data. cannot compare version because then there
498           // will be race with attemptToOwnTask()
499           // cannot just check whether the node has been transitioned to
500           // UNASSIGNED because by the time this worker sets the data watch
501           // the node might have made two transitions - from owned by this
502           // worker to unassigned to owned by another worker
503           if (! slt.isOwned(this.serverName) &&
504               ! slt.isDone(this.serverName) &&
505               ! slt.isErr(this.serverName) &&
506               ! slt.isResigned(this.serverName)) {
507             LOG.info("task " + taskpath + " preempted from " +
508                 serverName + ", current task state and owner=" + slt.toString());
509             stopTask();
510           }
511         }
512       }
513     }
514   }
515 
516   void getDataSetWatchFailure(String path) {
517     synchronized (grabTaskLock) {
518       if (workerInGrabTask) {
519         // currentTask can change but that's ok
520         String taskpath = currentTask;
521         if (taskpath != null && taskpath.equals(path)) {
522           LOG.info("retrying data watch on " + path);
523           SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
524           getDataSetWatchAsync();
525         } else {
526           // no point setting a watch on the task which this worker is not
527           // working upon anymore
528         }
529       }
530     }
531   }
532 
533   @Override
534   public void nodeDataChanged(String path) {
535     // there will be a self generated dataChanged event every time attemptToOwnTask()
536     // heartbeats the task znode by upping its version
537     synchronized (grabTaskLock) {
538       if (workerInGrabTask) {
539         // currentTask can change
540         String taskpath = currentTask;
541         if (taskpath!= null && taskpath.equals(path)) {
542           getDataSetWatchAsync();
543         }
544       }
545     }
546   }
547 
548 
549   private List<String> getTaskList() throws InterruptedException {
550     List<String> childrenPaths = null;
551     long sleepTime = 1000;
552     // It will be in loop till it gets the list of children or
553     // it will come out if worker thread exited.
554     while (!exitWorker) {
555       try {
556         childrenPaths = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
557             this.watcher.splitLogZNode);
558         if (childrenPaths != null) {
559           return childrenPaths;
560         }
561       } catch (KeeperException e) {
562         LOG.warn("Could not get children of znode "
563             + this.watcher.splitLogZNode, e);
564       }
565         LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode
566             + " after sleep for " + sleepTime + "ms!");
567         Thread.sleep(sleepTime);
568     }
569     return childrenPaths;
570   }
571 
572   @Override
573   public void nodeChildrenChanged(String path) {
574     if(path.equals(watcher.splitLogZNode)) {
575       LOG.debug("tasks arrived or departed");
576       synchronized (taskReadyLock) {
577         taskReadySeq++;
578         taskReadyLock.notify();
579       }
580     }
581   }
582 
583   /**
584    * If the worker is doing a task i.e. splitting a log file then stop the task.
585    * It doesn't exit the worker thread.
586    */
587   void stopTask() {
588     LOG.info("Sending interrupt to stop the worker thread");
589     worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
590   }
591 
592 
593   /**
594    * start the SplitLogWorker thread
595    */
596   public void start() {
597     worker = new Thread(null, this, "SplitLogWorker-" + serverName);
598     exitWorker = false;
599     worker.start();
600   }
601 
602   /**
603    * stop the SplitLogWorker thread
604    */
605   public void stop() {
606     exitWorker = true;
607     stopTask();
608   }
609 
610   /**
611    * Asynchronous handler for zk get-data-set-watch on node results.
612    */
613   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
614     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
615 
616     @Override
617     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
618       SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
619       if (rc != 0) {
620         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
621         getDataSetWatchFailure(path);
622         return;
623       }
624       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
625       getDataSetWatchSuccess(path, data);
626     }
627   }
628 
629   /**
630    * Objects implementing this interface actually do the task that has been
631    * acquired by a {@link SplitLogWorker}. Since there isn't a water-tight
632    * guarantee that two workers will not be executing the same task therefore it
633    * is better to have workers prepare the task and then have the
634    * {@link SplitLogManager} commit the work in SplitLogManager.TaskFinisher
635    */
636   public interface TaskExecutor {
637     enum Status {
638       DONE(),
639       ERR(),
640       RESIGNED(),
641       PREEMPTED()
642     }
643     Status exec(String name, RecoveryMode mode, CancelableProgressable p);
644   }
645 }