View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  package org.apache.hadoop.hbase.coordination;
21  
22  import java.io.IOException;
23  import java.util.ArrayList;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicInteger;
27  import java.util.concurrent.atomic.AtomicLong;
28  
29  import org.apache.commons.lang.math.RandomUtils;
30  import org.apache.commons.lang.mutable.MutableInt;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.conf.Configuration;
34  import org.apache.hadoop.fs.FileSystem;
35  import org.apache.hadoop.fs.Path;
36  import org.apache.hadoop.hbase.HConstants;
37  import org.apache.hadoop.hbase.ServerName;
38  import org.apache.hadoop.hbase.SplitLogCounters;
39  import org.apache.hadoop.hbase.SplitLogTask;
40  import org.apache.hadoop.hbase.classification.InterfaceAudience;
41  import org.apache.hadoop.hbase.exceptions.DeserializationException;
42  import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
43  import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
44  import org.apache.hadoop.hbase.regionserver.Region;
45  import org.apache.hadoop.hbase.regionserver.RegionServerServices;
46  import org.apache.hadoop.hbase.regionserver.SplitLogWorker;
47  import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor;
48  import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
49  import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler;
50  import org.apache.hadoop.hbase.util.CancelableProgressable;
51  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
52  import org.apache.hadoop.hbase.wal.DefaultWALProvider;
53  import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
54  import org.apache.hadoop.hbase.zookeeper.ZKUtil;
55  import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
56  import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
57  import org.apache.hadoop.util.StringUtils;
58  import org.apache.zookeeper.AsyncCallback;
59  import org.apache.zookeeper.KeeperException;
60  import org.apache.zookeeper.data.Stat;
61  
62  /**
63   * ZooKeeper based implementation of {@link SplitLogWorkerCoordination}
64   * It listen for changes in ZooKeeper and
65   *
66   */
67  @InterfaceAudience.Private
68  public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
69      SplitLogWorkerCoordination {
70  
71    private static final Log LOG = LogFactory.getLog(ZkSplitLogWorkerCoordination.class);
72  
73    private static final int checkInterval = 5000; // 5 seconds
74    private static final int FAILED_TO_OWN_TASK = -1;
75  
76    private  SplitLogWorker worker;
77  
78    private TaskExecutor splitTaskExecutor;
79  
80    private final Object taskReadyLock = new Object();
81    volatile int taskReadySeq = 0;
82    private volatile String currentTask = null;
83    private int currentVersion;
84    private volatile boolean shouldStop = false;
85    private final Object grabTaskLock = new Object();
86    private boolean workerInGrabTask = false;
87    private int reportPeriod;
88    private RegionServerServices server = null;
89    protected final AtomicInteger tasksInProgress = new AtomicInteger(0);
90    private int maxConcurrentTasks = 0;
91  
92    private final ZkCoordinatedStateManager manager;
93  
94    public ZkSplitLogWorkerCoordination(ZkCoordinatedStateManager zkCoordinatedStateManager,
95        ZooKeeperWatcher watcher) {
96      super(watcher);
97      manager = zkCoordinatedStateManager;
98  
99    }
100 
101   /**
102    * Override handler from {@link ZooKeeperListener}
103    */
104   @Override
105   public void nodeChildrenChanged(String path) {
106     if (path.equals(watcher.splitLogZNode)) {
107       LOG.debug("tasks arrived or departed");
108       synchronized (taskReadyLock) {
109         taskReadySeq++;
110         taskReadyLock.notify();
111       }
112     }
113   }
114 
115   /**
116    * Override handler from {@link ZooKeeperListener}
117    */
118   @Override
119   public void nodeDataChanged(String path) {
120     // there will be a self generated dataChanged event every time attemptToOwnTask()
121     // heartbeats the task znode by upping its version
122     synchronized (grabTaskLock) {
123       if (workerInGrabTask) {
124         // currentTask can change
125         String taskpath = currentTask;
126         if (taskpath != null && taskpath.equals(path)) {
127           getDataSetWatchAsync();
128         }
129       }
130     }
131   }
132 
133   /**
134    * Override setter from {@link SplitLogWorkerCoordination}
135    */
136   @Override
137   public void init(RegionServerServices server, Configuration conf,
138       TaskExecutor splitExecutor, SplitLogWorker worker) {
139     this.server = server;
140     this.worker = worker;
141     this.splitTaskExecutor = splitExecutor;
142     maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS);
143     reportPeriod =
144         conf.getInt("hbase.splitlog.report.period",
145           conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
146             ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT) / 3);
147   }
148 
149   /* Support functions for Zookeeper async callback */
150 
151   void getDataSetWatchFailure(String path) {
152     synchronized (grabTaskLock) {
153       if (workerInGrabTask) {
154         // currentTask can change but that's ok
155         String taskpath = currentTask;
156         if (taskpath != null && taskpath.equals(path)) {
157           LOG.info("retrying data watch on " + path);
158           SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet();
159           getDataSetWatchAsync();
160         } else {
161           // no point setting a watch on the task which this worker is not
162           // working upon anymore
163         }
164       }
165     }
166   }
167 
168   public void getDataSetWatchAsync() {
169     watcher.getRecoverableZooKeeper().getZooKeeper()
170         .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
171     SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet();
172   }
173 
174   void getDataSetWatchSuccess(String path, byte[] data) {
175     SplitLogTask slt;
176     try {
177       slt = SplitLogTask.parseFrom(data);
178     } catch (DeserializationException e) {
179       LOG.warn("Failed parse", e);
180       return;
181     }
182     synchronized (grabTaskLock) {
183       if (workerInGrabTask) {
184         // currentTask can change but that's ok
185         String taskpath = currentTask;
186         if (taskpath != null && taskpath.equals(path)) {
187           ServerName serverName = manager.getServer().getServerName();
188           // have to compare data. cannot compare version because then there
189           // will be race with attemptToOwnTask()
190           // cannot just check whether the node has been transitioned to
191           // UNASSIGNED because by the time this worker sets the data watch
192           // the node might have made two transitions - from owned by this
193           // worker to unassigned to owned by another worker
194           if (!slt.isOwned(serverName) && !slt.isDone(serverName) && !slt.isErr(serverName)
195               && !slt.isResigned(serverName)) {
196             LOG.info("task " + taskpath + " preempted from " + serverName
197                 + ", current task state and owner=" + slt.toString());
198             worker.stopTask();
199           }
200         }
201       }
202     }
203   }
204 
205   /**
206    * try to grab a 'lock' on the task zk node to own and execute the task.
207    * <p>
208    * @param path zk node for the task
209    */
210   private void grabTask(String path) {
211     Stat stat = new Stat();
212     byte[] data;
213     synchronized (grabTaskLock) {
214       currentTask = path;
215       workerInGrabTask = true;
216       if (Thread.interrupted()) {
217         return;
218       }
219     }
220     try {
221       try {
222         if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
223           SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
224           return;
225         }
226       } catch (KeeperException e) {
227         LOG.warn("Failed to get data for znode " + path, e);
228         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
229         return;
230       }
231       SplitLogTask slt;
232       try {
233         slt = SplitLogTask.parseFrom(data);
234       } catch (DeserializationException e) {
235         LOG.warn("Failed parse data for znode " + path, e);
236         SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
237         return;
238       }
239       if (!slt.isUnassigned()) {
240         SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
241         return;
242       }
243 
244       currentVersion =
245           attemptToOwnTask(true, watcher, server.getServerName(), path,
246             slt.getMode(), stat.getVersion());
247       if (currentVersion < 0) {
248         SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
249         return;
250       }
251 
252       if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
253         ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
254             new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
255         splitTaskDetails.setTaskNode(currentTask);
256         splitTaskDetails.setCurTaskZKVersion(new MutableInt(currentVersion));
257 
258         endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
259           SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
260         return;
261       }
262 
263       LOG.info("worker " + server.getServerName() + " acquired task " + path);
264       SplitLogCounters.tot_wkr_task_acquired.incrementAndGet();
265       getDataSetWatchAsync();
266 
267       submitTask(path, slt.getMode(), currentVersion, reportPeriod);
268 
269       // after a successful submit, sleep a little bit to allow other RSs to grab the rest tasks
270       try {
271         int sleepTime = RandomUtils.nextInt(500) + 500;
272         Thread.sleep(sleepTime);
273       } catch (InterruptedException e) {
274         LOG.warn("Interrupted while yielding for other region servers", e);
275         Thread.currentThread().interrupt();
276       }
277     } finally {
278       synchronized (grabTaskLock) {
279         workerInGrabTask = false;
280         // clear the interrupt from stopTask() otherwise the next task will
281         // suffer
282         Thread.interrupted();
283       }
284     }
285   }
286 
287   /**
288    * Submit a log split task to executor service
289    * @param curTask task to submit
290    * @param curTaskZKVersion current version of task
291    */
292   void submitTask(final String curTask, final RecoveryMode mode, final int curTaskZKVersion,
293       final int reportPeriod) {
294     final MutableInt zkVersion = new MutableInt(curTaskZKVersion);
295 
296     CancelableProgressable reporter = new CancelableProgressable() {
297       private long last_report_at = 0;
298 
299       @Override
300       public boolean progress() {
301         long t = EnvironmentEdgeManager.currentTime();
302         if ((t - last_report_at) > reportPeriod) {
303           last_report_at = t;
304           int latestZKVersion =
305               attemptToOwnTask(false, watcher, server.getServerName(), curTask,
306                 mode, zkVersion.intValue());
307           if (latestZKVersion < 0) {
308             LOG.warn("Failed to heartbeat the task" + curTask);
309             return false;
310           }
311           zkVersion.setValue(latestZKVersion);
312         }
313         return true;
314       }
315     };
316     ZkSplitLogWorkerCoordination.ZkSplitTaskDetails splitTaskDetails =
317         new ZkSplitLogWorkerCoordination.ZkSplitTaskDetails();
318     splitTaskDetails.setTaskNode(curTask);
319     splitTaskDetails.setCurTaskZKVersion(zkVersion);
320 
321     WALSplitterHandler hsh =
322         new WALSplitterHandler(server, this, splitTaskDetails, reporter,
323             this.tasksInProgress, splitTaskExecutor, mode);
324     server.getExecutorService().submit(hsh);
325   }
326 
327   /**
328    * This function calculates how many splitters it could create based on expected average tasks per
329    * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
330    * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
331    * @param numTasks current total number of available tasks
332    */
333   private int calculateAvailableSplitters(int numTasks) {
334     // at lease one RS(itself) available
335     int availableRSs = 1;
336     try {
337       List<String> regionServers =
338           ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode);
339       availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size());
340     } catch (KeeperException e) {
341       // do nothing
342       LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
343     }
344 
345     int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
346     expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
347     // calculate how many more splitters we could spawn
348     return Math.min(expectedTasksPerRS, maxConcurrentTasks)
349         - this.tasksInProgress.get();
350   }
351 
352   /**
353    * Try to own the task by transitioning the zk node data from UNASSIGNED to OWNED.
354    * <p>
355    * This method is also used to periodically heartbeat the task progress by transitioning the node
356    * from OWNED to OWNED.
357    * <p>
358    * @param isFirstTime shows whther it's the first attempt.
359    * @param zkw zk wathcer
360    * @param server name
361    * @param task to own
362    * @param taskZKVersion version of the task in zk
363    * @return non-negative integer value when task can be owned by current region server otherwise -1
364    */
365   protected static int attemptToOwnTask(boolean isFirstTime, ZooKeeperWatcher zkw,
366       ServerName server, String task, RecoveryMode mode, int taskZKVersion) {
367     int latestZKVersion = FAILED_TO_OWN_TASK;
368     try {
369       SplitLogTask slt = new SplitLogTask.Owned(server, mode);
370       Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
371       if (stat == null) {
372         LOG.warn("zk.setData() returned null for path " + task);
373         SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
374         return FAILED_TO_OWN_TASK;
375       }
376       latestZKVersion = stat.getVersion();
377       SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet();
378       return latestZKVersion;
379     } catch (KeeperException e) {
380       if (!isFirstTime) {
381         if (e.code().equals(KeeperException.Code.NONODE)) {
382           LOG.warn("NONODE failed to assert ownership for " + task, e);
383         } else if (e.code().equals(KeeperException.Code.BADVERSION)) {
384           LOG.warn("BADVERSION failed to assert ownership for " + task, e);
385         } else {
386           LOG.warn("failed to assert ownership for " + task, e);
387         }
388       }
389     } catch (InterruptedException e1) {
390       LOG.warn("Interrupted while trying to assert ownership of " + task + " "
391           + StringUtils.stringifyException(e1));
392       Thread.currentThread().interrupt();
393     }
394     SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet();
395     return FAILED_TO_OWN_TASK;
396   }
397 
398   /**
399    * Wait for tasks to become available at /hbase/splitlog zknode. Grab a task one at a time. This
400    * policy puts an upper-limit on the number of simultaneous log splitting that could be happening
401    * in a cluster.
402    * <p>
403    * Synchronization using {@link #taskReadyLock} ensures that it will try to grab every task that
404    * has been put up
405    * @throws InterruptedException
406    */
407   @Override
408   public void taskLoop() throws InterruptedException {
409     while (!shouldStop) {
410       int seq_start = taskReadySeq;
411       List<String> paths = null;
412       paths = getTaskList();
413       if (paths == null) {
414         LOG.warn("Could not get tasks, did someone remove " + watcher.splitLogZNode
415             + " ... worker thread exiting.");
416         return;
417       }
418       // pick meta wal firstly
419       int offset = (int) (Math.random() * paths.size());
420       for (int i = 0; i < paths.size(); i++) {
421         if (DefaultWALProvider.isMetaFile(paths.get(i))) {
422           offset = i;
423           break;
424         }
425       }
426       int numTasks = paths.size();
427       for (int i = 0; i < numTasks; i++) {
428         int idx = (i + offset) % paths.size();
429         // don't call ZKSplitLog.getNodeName() because that will lead to
430         // double encoding of the path name
431         if (this.calculateAvailableSplitters(numTasks) > 0) {
432           grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
433         } else {
434           LOG.debug("Current region server " + server.getServerName() + " has "
435               + this.tasksInProgress.get() + " tasks in progress and can't take more.");
436           break;
437         }
438         if (shouldStop) {
439           return;
440         }
441       }
442       SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
443       synchronized (taskReadyLock) {
444         while (seq_start == taskReadySeq) {
445           taskReadyLock.wait(checkInterval);
446           if (server != null) {
447             // check to see if we have stale recovering regions in our internal memory state
448             Map<String, Region> recoveringRegions = server.getRecoveringRegions();
449             if (!recoveringRegions.isEmpty()) {
450               // Make a local copy to prevent ConcurrentModificationException when other threads
451               // modify recoveringRegions
452               List<String> tmpCopy = new ArrayList<String>(recoveringRegions.keySet());
453               int listSize = tmpCopy.size();
454               for (int i = 0; i < listSize; i++) {
455                 String region = tmpCopy.get(i);
456                 String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, region);
457                 try {
458                   if (ZKUtil.checkExists(watcher, nodePath) == -1) {
459                     server.getExecutorService().submit(
460                       new FinishRegionRecoveringHandler(server, region, nodePath));
461                   } else {
462                     // current check is a defensive(or redundant) mechanism to prevent us from
463                     // having stale recovering regions in our internal RS memory state while
464                     // zookeeper(source of truth) says differently. We stop at the first good one
465                     // because we should not have a single instance such as this in normal case so
466                     // check the first one is good enough.
467                     break;
468                   }
469                 } catch (KeeperException e) {
470                   // ignore zookeeper error
471                   LOG.debug("Got a zookeeper when trying to open a recovering region", e);
472                   break;
473                 }
474               }
475             }
476           }
477         }
478       }
479     }
480   }
481 
482   private List<String> getTaskList() throws InterruptedException {
483     List<String> childrenPaths = null;
484     long sleepTime = 1000;
485     // It will be in loop till it gets the list of children or
486     // it will come out if worker thread exited.
487     while (!shouldStop) {
488       try {
489         childrenPaths =
490             ZKUtil.listChildrenAndWatchForNewChildren(watcher,
491               watcher.splitLogZNode);
492         if (childrenPaths != null) {
493           return childrenPaths;
494         }
495       } catch (KeeperException e) {
496         LOG.warn("Could not get children of znode " + watcher.splitLogZNode, e);
497       }
498       LOG.debug("Retry listChildren of znode " + watcher.splitLogZNode
499           + " after sleep for " + sleepTime + "ms!");
500       Thread.sleep(sleepTime);
501     }
502     return childrenPaths;
503   }
504 
505   @Override
506   public void markCorrupted(Path rootDir, String name, FileSystem fs) {
507     ZKSplitLog.markCorrupted(rootDir, name, fs);
508   }
509 
510   @Override
511   public boolean isReady() throws InterruptedException {
512     int result = -1;
513     try {
514       result = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
515     } catch (KeeperException e) {
516       // ignore
517       LOG.warn("Exception when checking for " + watcher.splitLogZNode
518           + " ... retrying", e);
519     }
520     if (result == -1) {
521       LOG.info(watcher.splitLogZNode
522           + " znode does not exist, waiting for master to create");
523       Thread.sleep(1000);
524     }
525     return (result != -1);
526   }
527 
528   @Override
529   public int getTaskReadySeq() {
530     return taskReadySeq;
531   }
532 
533   @Override
534   public void registerListener() {
535     watcher.registerListener(this);
536   }
537 
538   @Override
539   public void removeListener() {
540     watcher.unregisterListener(this);
541   }
542 
543 
544   @Override
545   public void stopProcessingTasks() {
546     this.shouldStop = true;
547 
548   }
549 
550   @Override
551   public boolean isStop() {
552     return shouldStop;
553   }
554 
555   @Override
556   public RegionStoreSequenceIds getRegionFlushedSequenceId(String failedServerName, String key)
557       throws IOException {
558     return ZKSplitLog.getRegionFlushedSequenceId(watcher, failedServerName, key);
559   }
560 
561   /**
562    * Asynchronous handler for zk get-data-set-watch on node results.
563    */
564   class GetDataAsyncCallback implements AsyncCallback.DataCallback {
565     private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);
566 
567     @Override
568     public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
569       SplitLogCounters.tot_wkr_get_data_result.incrementAndGet();
570       if (rc != 0) {
571         LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
572         getDataSetWatchFailure(path);
573         return;
574       }
575       data = watcher.getRecoverableZooKeeper().removeMetaData(data);
576       getDataSetWatchSuccess(path, data);
577     }
578   }
579 
580   /*
581    * Next part is related to WALSplitterHandler
582    */
583   /**
584    * endTask() can fail and the only way to recover out of it is for the
585    * {@link org.apache.hadoop.hbase.master.SplitLogManager} to timeout the task node.
586    * @param slt
587    * @param ctr
588    */
589   @Override
590   public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) {
591     ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
592     String task = zkDetails.getTaskNode();
593     int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
594     try {
595       if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
596         LOG.info("successfully transitioned task " + task + " to final state " + slt);
597         ctr.incrementAndGet();
598         return;
599       }
600       LOG.warn("failed to transistion task " + task + " to end state " + slt
601           + " because of version mismatch ");
602     } catch (KeeperException.BadVersionException bve) {
603       LOG.warn("transisition task " + task + " to " + slt + " failed because of version mismatch",
604         bve);
605     } catch (KeeperException.NoNodeException e) {
606       LOG.fatal(
607         "logic error - end task " + task + " " + slt + " failed because task doesn't exist", e);
608     } catch (KeeperException e) {
609       LOG.warn("failed to end task, " + task + " " + slt, e);
610     }
611     SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet();
612   }
613 
614   /**
615    * When ZK-based implementation wants to complete the task, it needs to know task znode and
616    * current znode cversion (needed for subsequent update operation).
617    */
618   public static class ZkSplitTaskDetails implements SplitTaskDetails {
619     private String taskNode;
620     private MutableInt curTaskZKVersion;
621 
622     public ZkSplitTaskDetails() {
623     }
624 
625     public ZkSplitTaskDetails(String taskNode, MutableInt curTaskZKVersion) {
626       this.taskNode = taskNode;
627       this.curTaskZKVersion = curTaskZKVersion;
628     }
629 
630     public String getTaskNode() {
631       return taskNode;
632     }
633 
634     public void setTaskNode(String taskNode) {
635       this.taskNode = taskNode;
636     }
637 
638     public MutableInt getCurTaskZKVersion() {
639       return curTaskZKVersion;
640     }
641 
642     public void setCurTaskZKVersion(MutableInt curTaskZKVersion) {
643       this.curTaskZKVersion = curTaskZKVersion;
644     }
645 
646     @Override
647     public String getWALFile() {
648       return ZKSplitLog.getFileName(taskNode);
649     }
650   }
651 
652 }